You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2022/02/07 08:38:05 UTC

[hive] branch master updated: HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)

This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c0e81  HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)
29c0e81 is described below

commit 29c0e81d938dbc7d56a600476c06ed9a6cba6298
Author: Viktor Csomor <cs...@gmail.com>
AuthorDate: Mon Feb 7 09:37:53 2022 +0100

    HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)
    
    The common logic required by the `AcidMetricLogger` and the `AcidMetricService` had been extracted to a package-private component `AcidMetricData`.
    This change enabled to move the logging from AcidMetricService to AcidMetricLogger.
    
    Added methods:
    - logMultipleWorkerVersions
    - logFailedCompactionsPercentage
    - logOldestInitiatorAge
    
    Tests added.
    
    Closes #2995.
---
 .../ql/txn/compactor/TestCompactionMetrics.java    |   1 -
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   3 +-
 .../hive/metastore/metrics/AcidMetricLogger.java   |  52 ++-
 .../hive/metastore/metrics/AcidMetricService.java  | 149 ++------
 .../metastore/metrics/CompactionMetricData.java    | 210 +++++++++++
 .../metrics/TestCompactionMetricData.java          | 390 +++++++++++++++++++++
 .../TestMultipleWorkerVersionDetection.java        | 123 -------
 7 files changed, 678 insertions(+), 250 deletions(-)

diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index eea0c3b..07a3212 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index aa6b18d..ae23b2f 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -608,8 +608,7 @@ public class MetastoreConf {
       "hive.metastore.compactor.worker.detect.multiple.versions.threshold", 24, TimeUnit.HOURS,
       "Defines a time-window in hours from the current time backwards\n," +
             "in which a warning is being raised if multiple worker version are detected.\n" +
-            "The setting has no effect if the metastore.metrics.enabled is disabled \n" +
-            "or the metastore.acidmetrics.thread.on is turned off."),
+            "The setting has no effect if the metastore.compactor.acid.metrics.logger.frequency is 0."),
     COMPACTOR_MINOR_STATS_COMPRESSION(
         "metastore.compactor.enable.stats.compression",
         "metastore.compactor.enable.stats.compression", true,
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
index e8ad33b..35450a4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.metrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
 import org.apache.hadoop.hive.metastore.txn.MetricsInfo;
@@ -56,7 +58,7 @@ public class AcidMetricLogger implements MetastoreTaskThread {
   public void run() {
     try {
       logDbMetrics();
-      logDeltaMetrics();
+      logMetrics();
     } catch (MetaException e) {
       LOG.warn("Caught exception while trying to log acid metrics data.", e);
     }
@@ -74,6 +76,17 @@ public class AcidMetricLogger implements MetastoreTaskThread {
     return conf;
   }
 
+  private void logMetrics() throws MetaException {
+    ShowCompactResponse response = txnHandler.showCompact(new ShowCompactRequest());
+    CompactionMetricData metricData = CompactionMetricData.of(response.getCompacts());
+
+    logMultipleWorkerVersions(metricData);
+    logFailedCompactionsPercentage(metricData);
+    logOldestInitiatorAge(metricData);
+
+    logDeltaMetrics();
+  }
+
   private void logDeltaMetrics() throws MetaException {
     List<CompactionMetricsData> deltas = txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
     deltas.stream().filter(d -> d.getMetricType() == NUM_DELTAS).forEach(d -> LOG.warn(
@@ -89,6 +102,42 @@ public class AcidMetricLogger implements MetastoreTaskThread {
         AcidMetricService.getDeltaCountKey(d.getDbName(), d.getTblName(), d.getPartitionName()), d.getMetricValue())));
   }
 
+  private void logOldestInitiatorAge(CompactionMetricData metricData) {
+    int oldestInitiatorAge = (int) ((System.currentTimeMillis() - metricData.getOldestEnqueueTime()) / 1000L);
+    String oldestInitiatorMessage = "Found compaction entry in compaction queue with an age of {} seconds. " +
+        "Consider increasing the number of worker threads.";
+    long oldestInitiatedWarningThreshold = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
+        TimeUnit.SECONDS);
+    long oldestInitiatedErrorThreshold = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
+        TimeUnit.SECONDS);
+    if (oldestInitiatorAge >= oldestInitiatedErrorThreshold) {
+      LOG.error(oldestInitiatorMessage, oldestInitiatorAge);
+    } else if (oldestInitiatorAge >= oldestInitiatedWarningThreshold) {
+      LOG.warn(oldestInitiatorMessage, oldestInitiatorAge);
+    }
+  }
+
+  private void logMultipleWorkerVersions(CompactionMetricData metricData) {
+    long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS);
+    List<String> versions = metricData
+        .allWorkerVersionsSince(System.currentTimeMillis() - workerVersionThresholdInMillis);
+
+    if (versions.size() > 1) {
+      LOG.warn("Multiple Compaction Worker versions detected: {}", versions);
+    }
+  }
+
+  private void logFailedCompactionsPercentage(CompactionMetricData metricData) {
+    Double failedCompactionPercentage = metricData.getFailedCompactionPercentage();
+    if (failedCompactionPercentage != null &&
+        (failedCompactionPercentage >=
+            MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) {
+      LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions.");
+    }
+  }
   private void logDbMetrics() throws MetaException {
     MetricsInfo metrics = txnHandler.getMetricsInfo();
     if (metrics.getTxnToWriteIdCount() >= MetastoreConf.getIntVar(conf,
@@ -170,4 +219,3 @@ public class AcidMetricLogger implements MetastoreTaskThread {
     }
   }
 }
-
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
index 898efb2..8ce09ea 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.metastore.metrics;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.AcidConstants;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
 import org.apache.hadoop.hive.metastore.txn.MetricsInfo;
@@ -46,15 +44,13 @@ import org.slf4j.LoggerFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATORS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS;
@@ -121,9 +117,7 @@ public class AcidMetricService implements MetastoreTaskThread {
       }
       long startedAt = System.currentTimeMillis();
         try {
-          ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
-          detectMultipleWorkerVersions(currentCompactions);
-          updateMetrics(currentCompactions);
+          updateMetrics();
           updateDeltaMetrics();
         } catch (Exception ex) {
          LOG.error("Caught exception in AcidMetricService loop", ex);
@@ -296,38 +290,12 @@ public class AcidMetricService implements MetastoreTaskThread {
     }
   }
 
-  private void detectMultipleWorkerVersions(ShowCompactResponse currentCompactions) {
-    long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf,
-        MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS);
-    long since = System.currentTimeMillis() - workerVersionThresholdInMillis;
-
-    List<String> versions = collectWorkerVersions(currentCompactions.getCompacts(), since);
-    if (versions.size() > 1) {
-      LOG.warn("Multiple Compaction Worker versions detected: {}", versions);
-    }
-  }
-
-  private void updateMetrics(ShowCompactResponse currentCompactions) throws MetaException {
+  private void updateMetrics() throws MetaException {
+    ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
     updateMetricsFromShowCompact(currentCompactions, conf);
     updateDBMetrics();
   }
 
-  @VisibleForTesting
-  public static List<String> collectWorkerVersions(List<ShowCompactResponseElement> currentCompacts, long since) {
-    return Optional.ofNullable(currentCompacts)
-      .orElseGet(ImmutableList::of)
-      .stream()
-      .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since))
-        || (comp.isSetStart() && (comp.getStart() >= since))
-        || (comp.isSetEndTime() && (comp.getEndTime() >= since)))
-      .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState()))
-      .map(ShowCompactResponseElement::getWorkerVersion)
-      .filter(Objects::nonNull)
-      .distinct()
-      .sorted()
-      .collect(Collectors.toList());
-  }
-
   private void updateDBMetrics() throws MetaException {
     MetricsInfo metrics = txnHandler.getMetricsInfo();
     Metrics.getOrCreateGauge(NUM_TXN_TO_WRITEID).set(metrics.getTxnToWriteIdCount());
@@ -347,112 +315,49 @@ public class AcidMetricService implements MetastoreTaskThread {
     Metrics.getOrCreateGauge(OLDEST_READY_FOR_CLEANING_AGE).set(metrics.getOldestReadyForCleaningAge());
   }
 
-
-
   @VisibleForTesting
   public static void updateMetricsFromShowCompact(ShowCompactResponse showCompactResponse, Configuration conf) {
-    Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
-    long oldestEnqueueTime = Long.MAX_VALUE;
-    long oldestWorkingTime = Long.MAX_VALUE;
-    long oldestCleaningTime = Long.MAX_VALUE;
-
-    // Get the last compaction for each db/table/partition
-    for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) {
-      String key = element.getDbname() + "/" + element.getTablename() +
-          (element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
-
-      // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id
-      lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old));
-
-      // find the oldest elements with initiated and working states
-      String state = element.getState();
-      if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) {
-        oldestEnqueueTime = element.getEnqueueTime();
-      }
-
-      if (element.isSetStart()) {
-        if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) {
-          oldestWorkingTime = element.getStart();
-        }
-      }
-
-      if (element.isSetCleanerStart()) {
-        if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) {
-          oldestCleaningTime = element.getCleanerStart();
-        }
-      }
-    }
+    CompactionMetricData metricData = CompactionMetricData.of(showCompactResponse.getCompacts());
 
     // Get the current count for each state
-    Map<String, Long> counts = lastElements.values().stream()
-        .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting()));
+    Map<String, Long> counts = metricData.getStateCount();
 
     // Update metrics
     for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
       String key = COMPACTION_STATUS_PREFIX + replaceWhitespace(TxnStore.COMPACTION_STATES[i]);
       Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
       if (count != null) {
-        Metrics.getOrCreateGauge(key).set(count.intValue());
+        Metrics.getOrCreateGauge(key)
+            .set(count.intValue());
       } else {
-        Metrics.getOrCreateGauge(key).set(0);
+        Metrics.getOrCreateGauge(key)
+            .set(0);
       }
     }
 
-    Long numFailedComp = counts.get(TxnStore.FAILED_RESPONSE);
-    Long numNotInitiatedComp = counts.get(TxnStore.DID_NOT_INITIATE_RESPONSE);
-    Long numSucceededComp = counts.get(TxnStore.SUCCEEDED_RESPONSE);
-    if (numFailedComp != null && numNotInitiatedComp != null && numSucceededComp != null &&
-        ((numFailedComp + numNotInitiatedComp) / (numFailedComp + numNotInitiatedComp + numSucceededComp) >
-      MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) {
-      LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions.");
-    }
+    updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, metricData.getOldestEnqueueTime(), conf);
+    updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, metricData.getOldestWorkingTime(), conf);
+    updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, metricData.getOldestCleaningTime(), conf);
 
-    updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, oldestEnqueueTime, conf,
-        "Found compaction entry in compaction queue with an age of {} seconds. " +
-            "Consider increasing the number of worker threads.",
-        MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
-        MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR);
-    updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, oldestWorkingTime, conf);
-    updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, oldestCleaningTime, conf);
-
-    long initiatorsCount = lastElements.values().stream()
-        //manually initiated compactions don't count
-        .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId())))
-        .map(e -> getHostFromId(e.getInitiatorId())).distinct().filter(e -> !NO_VAL.equals(e)).count();
-    Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS).set((int) initiatorsCount);
-    long workersCount = lastElements.values().stream()
-        .map(e -> getHostFromId(e.getWorkerid())).distinct().filter(e -> !NO_VAL.equals(e)).count();
-    Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS).set((int) workersCount);
-
-    long initiatorVersionsCount = lastElements.values().stream()
-        .map(ShowCompactResponseElement::getInitiatorVersion).distinct().filter(Objects::nonNull).count();
-    Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS).set((int) initiatorVersionsCount);
-    long workerVersionsCount = lastElements.values().stream()
-        .map(ShowCompactResponseElement::getWorkerVersion).distinct().filter(Objects::nonNull).count();
-    Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int) workerVersionsCount);
-  }
+    Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS)
+        .set((int) metricData.getInitiatorsCount());
+    Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS)
+        .set((int) metricData.getWorkersCount());
 
-  private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf) {
-    updateOldestCompactionMetric(metricName, oldestTime, conf, null, null, null);
+    Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS)
+        .set((int) metricData.getInitiatorVersionsCount());
+    Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS)
+        .set((int) metricData.getWorkerVersionsCount());
   }
 
-  private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf,
-      String logMessage, MetastoreConf.ConfVars warningThreshold, MetastoreConf.ConfVars errorThreshold) {
-    if (oldestTime == Long.MAX_VALUE) {
+  private static void updateOldestCompactionMetric(String metricName, Long oldestTime, Configuration conf) {
+    if (oldestTime == null) {
       Metrics.getOrCreateGauge(metricName)
           .set(0);
-      return;
-    }
-
-    int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
-    Metrics.getOrCreateGauge(metricName)
-        .set(oldestAge);
-    if (logMessage != null) {
-      if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold, TimeUnit.SECONDS)) {
-        LOG.error(logMessage, oldestAge);
-      } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold, TimeUnit.SECONDS)) {
-        LOG.warn(logMessage, oldestAge);
-      }
+    } else {
+      int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
+      Metrics.getOrCreateGauge(metricName)
+          .set(oldestAge);
     }
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java
new file mode 100644
index 0000000..81eee8d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.NO_VAL;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getHostFromId;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getThreadIdFromId;
+
+final class CompactionMetricData {
+
+  private static final Long OLDEST_TIME_NO_VALUE = Long.MAX_VALUE;
+
+  private final List<ShowCompactResponseElement> compacts;
+
+  private long oldestEnqueueTime;
+  private long oldestWorkingTime;
+  private long oldestCleaningTime;
+
+  private Map<String, Long> stateCount;
+
+  private Double failedCompactionPercentage;
+
+  private long initiatorsCount;
+  private long initiatorVersionsCount;
+  private long workersCount;
+  private long workerVersionsCount;
+
+  private CompactionMetricData(List<ShowCompactResponseElement> compacts) {
+    this.compacts = compacts;
+  }
+
+  static CompactionMetricData of(List<ShowCompactResponseElement> compacts) {
+    CompactionMetricData data = new CompactionMetricData(Optional.ofNullable(compacts)
+        .orElseGet(ImmutableList::of));
+    data.init();
+    return data;
+  }
+
+  private void init() {
+    final Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
+
+    oldestEnqueueTime = OLDEST_TIME_NO_VALUE;
+    oldestWorkingTime = OLDEST_TIME_NO_VALUE;
+    oldestCleaningTime = OLDEST_TIME_NO_VALUE;
+    for (ShowCompactResponseElement element : compacts) {
+      final String key = element.getDbname() + "/" + element.getTablename() +
+          (element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
+
+      // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id
+      lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old));
+
+      // find the oldest elements with initiated and working states
+      String state = element.getState();
+      if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) {
+        oldestEnqueueTime = element.getEnqueueTime();
+      }
+
+      if (element.isSetStart()) {
+        if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) {
+          oldestWorkingTime = element.getStart();
+        }
+      }
+
+      if (element.isSetCleanerStart()) {
+        if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) {
+          oldestCleaningTime = element.getCleanerStart();
+        }
+      }
+    }
+
+    stateCount = lastElements
+        .values()
+        .stream()
+        .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting()));
+
+    failedCompactionPercentage = calculateFailedPercentage(stateCount);
+
+    initiatorsCount = lastElements.values()
+        .stream()
+        //manually initiated compactions don't count
+        .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId())))
+        .map(e -> getHostFromId(e.getInitiatorId()))
+        .filter(e -> !NO_VAL.equals(e))
+        .distinct()
+        .count();
+    initiatorVersionsCount = lastElements.values()
+        .stream()
+        .map(ShowCompactResponseElement::getInitiatorVersion)
+        .filter(Objects::nonNull)
+        .distinct()
+        .count();
+
+    workersCount = lastElements.values()
+        .stream()
+        .map(e -> getHostFromId(e.getWorkerid()))
+        .filter(e -> !NO_VAL.equals(e))
+        .distinct()
+        .count();
+    workerVersionsCount = lastElements.values()
+        .stream()
+        .map(ShowCompactResponseElement::getWorkerVersion)
+        .filter(Objects::nonNull)
+        .distinct()
+        .count();
+  }
+
+  List<String> allWorkerVersionsSince(long since) {
+    return compacts.stream()
+        .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since))
+            || (comp.isSetStart() && (comp.getStart() >= since))
+            || (comp.isSetEndTime() && (comp.getEndTime() >= since)))
+        .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState()))
+        .map(ShowCompactResponseElement::getWorkerVersion)
+        .filter(Objects::nonNull)
+        .distinct()
+        .sorted()
+        .collect(Collectors.toList());
+  }
+
+  Map<String, Long> getStateCount() {
+    return new HashMap<>(stateCount);
+  }
+
+  Long getOldestEnqueueTime() {
+    return nullIfNotSet(oldestEnqueueTime);
+  }
+
+  Long getOldestWorkingTime() {
+    return nullIfNotSet(oldestWorkingTime);
+  }
+
+  Long getOldestCleaningTime() {
+    return nullIfNotSet(oldestCleaningTime);
+  }
+
+  Double getFailedCompactionPercentage() {
+    return failedCompactionPercentage;
+  }
+
+  long getInitiatorsCount() {
+    return initiatorsCount;
+  }
+
+  long getInitiatorVersionsCount() {
+    return initiatorVersionsCount;
+  }
+
+  long getWorkersCount() {
+    return workersCount;
+  }
+
+  long getWorkerVersionsCount() {
+    return workerVersionsCount;
+  }
+
+  private static Long nullIfNotSet(long value) {
+    if (value == OLDEST_TIME_NO_VALUE) {
+      return null;
+    }
+    return value;
+  }
+
+  private static Double calculateFailedPercentage(Map<String, Long> stateCount) {
+    long failed = unwrapToPrimitive(stateCount.get(TxnStore.FAILED_RESPONSE));
+    long notInitiated = unwrapToPrimitive(stateCount.get(TxnStore.DID_NOT_INITIATE_RESPONSE));
+    long succeeded = unwrapToPrimitive(stateCount.get(TxnStore.SUCCEEDED_RESPONSE));
+
+    long denominator = failed + notInitiated + succeeded;
+    if (denominator > 0) {
+      long numerator = failed + notInitiated;
+      return Long.valueOf(numerator).doubleValue() / Long.valueOf(denominator).doubleValue();
+    }
+
+    return null;
+  }
+
+  private static long unwrapToPrimitive(Long value) {
+    if (value == null) {
+      return 0L;
+    }
+    return value;
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java
new file mode 100644
index 0000000..aa603bd
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static java.util.Collections.emptyMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(MetastoreUnitTest.class)
+public class TestCompactionMetricData {
+
+  private static final long SINCE_EPOCH = 0L;
+
+  private static final String INITIATED = "initiated";
+  private static final String WORKING = "working";
+  private static final String READY_FOR_CLEANING = "ready for cleaning";
+  private static final String DID_NOT_INITIATE = "did not initiate";
+  private static final String SUCCEEDED = "succeeded";
+  private static final String FAILED = "failed";
+
+  @Test
+  public void testStateCountsCountedCorrectly() {
+    assertThat(
+        CompactionMetricData.of(null)
+            .getStateCount(),
+        is(emptyMap()));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR),
+                    aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR),
+                    aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR),
+                    aCompaction(5, "t2", "part1", WORKING, CompactionType.MAJOR),
+                    aCompaction(6, "t2", "part2", WORKING, CompactionType.MAJOR),
+                    aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR)
+                ))
+            .getStateCount(),
+        is(ImmutableMap.of(
+            INITIATED, 1L,
+            WORKING, 2L,
+            READY_FOR_CLEANING, 1L
+        )));
+  }
+
+  @Test
+  public void testOldestEnqueuedValueCalculatedCorrectly() {
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null)
+                ))
+            .getOldestEnqueueTime(),
+        nullValue());
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, 150L, null, null),
+                    aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, 100L, null, null),
+                    aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null),
+                    aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, 50L, null, null),
+                    aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, 300L, null, null)
+                ))
+            .getOldestEnqueueTime(),
+        is(100L));
+  }
+
+  @Test
+  public void testOldestWorkingValueCalculatedCorrectly() {
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, 90L, null)
+                ))
+            .getOldestWorkingTime(),
+        nullValue());
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, 150L, null),
+                    aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, null, 100L, null),
+                    aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, 90L, null),
+                    aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, 70L, null),
+                    aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, null, 50L, null),
+                    aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, 300L, null)
+                ))
+            .getOldestWorkingTime(),
+        is(70L));
+  }
+
+  @Test
+  public void testOldestCleaningValueCalculatedCorrectly() {
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, null, 90L)
+                ))
+            .getOldestCleaningTime(),
+        nullValue());
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, null, 150L),
+                    aCompaction(2, "t1", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 100L),
+                    aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 90L),
+                    aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 70L),
+                    aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 300L)
+                ))
+            .getOldestCleaningTime(),
+        is(100L));
+  }
+
+  @Test
+  public void testFailedPercentageCalculatedCorrectly() {
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of())
+            .getFailedCompactionPercentage(),
+        nullValue());
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(aCompaction(1, "t1", "p1", SUCCEEDED, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(0.0D));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+                    aCompaction(2, "t2", "p1", SUCCEEDED, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(0.5D));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+                    aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR),
+                    aCompaction(3, "t3", "p1", SUCCEEDED, CompactionType.MINOR),
+                    aCompaction(4, "t4", "p1", SUCCEEDED, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(0.5D));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+                    aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(1.0D));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(1.0D));
+
+    assertThat(
+        CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, "t1", "p1", DID_NOT_INITIATE, CompactionType.MINOR)))
+            .getFailedCompactionPercentage(),
+        is(1.0D));
+  }
+
+  @Test
+  public void testInitiatorCountCalculatedCorrectly() {
+    assertThat(CompactionMetricData.of(Collections.emptyList())
+            .getInitiatorsCount(),
+        is(0L));
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, null, null, null, (String) null),
+                    aCompaction(2, "host1-initiator", null, null, (String) null),
+                    aCompaction(3, "host2-initiator-manual", null, null, (String) null),
+                    aCompaction(4, "host3-initiator", null, null, (String) null)))
+            .getInitiatorsCount(),
+        is(2L));
+  }
+
+  @Test
+  public void testInitiatorVersionsCalculatedCorrectly() {
+    assertThat(CompactionMetricData.of(Collections.emptyList())
+            .getInitiatorVersionsCount(),
+        is(0L));
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, null, "1.0", null, (String) null),
+                    aCompaction(2, null, "3.0", null, (String) null),
+                    aCompaction(3, null, "4.0", null, (String) null),
+                    aCompaction(4, null, null, null, (String) null),
+                    aCompaction(5, null, "4.0", null, (String) null)))
+            .getInitiatorVersionsCount(),
+        is(3L));
+  }
+
+  @Test
+  public void testWorkerCountCalculatedCorrectly() {
+    assertThat(CompactionMetricData.of(Collections.emptyList())
+            .getWorkersCount(),
+        is(0L));
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, null, null, null, "4.0"),
+                    aCompaction(2, null, null, "host1-worker", "4.0"),
+                    aCompaction(3, null, null, "host2-worker", "4.0")))
+            .getWorkersCount(),
+        is(2L));
+  }
+
+  @Test
+  public void testWorkerVersionsCalculatedCorrectly() {
+    assertThat(CompactionMetricData.of(Collections.emptyList())
+            .getWorkerVersionsCount(),
+        is(0L));
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(1, null, null, null, "1.0"),
+                    aCompaction(2, null, null, null, "3.0"),
+                    aCompaction(3, null, null, null, "4.0"),
+                    aCompaction(4, null, null, null, (String) null),
+                    aCompaction(5, null, null, null, "4.0")))
+            .getWorkerVersionsCount(),
+        is(3L));
+  }
+
+  @Test
+  public void testCollectWorkerVersionsEmptyLists() {
+    assertThat(CompactionMetricData.of(Collections.emptyList()).allWorkerVersionsSince(SINCE_EPOCH),
+
+        is(Collections.emptyList()));
+  }
+
+  @Test
+  public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() {
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction("DoNotShowUp", DID_NOT_INITIATE, 1L, 1L, 1L),
+                    aCompaction("1.0", INITIATED, 1L, 1L, 1L)))
+            .allWorkerVersionsSince(SINCE_EPOCH),
+
+        is(Collections.singletonList("1.0"))
+    );
+  }
+
+  @Test
+  public void testCollectWorkerVersionsNullVersionGettingFilteredOut() {
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction(null, INITIATED, 1L, 1L, 1L)))
+            .allWorkerVersionsSince(SINCE_EPOCH),
+
+        is(Collections.emptyList())
+    );
+  }
+
+  @Test
+  public void testCollectWorkerVersionsTimeThreshold() {
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction("0.0-not-shown", INITIATED, 99L, null, null),
+                    aCompaction("0.1-not-shown", INITIATED, 99L, 99L, null),
+                    aCompaction("0.2-not-shown", INITIATED, 99L, 99L, 99L),
+
+                    aCompaction("1.0", INITIATED, 100L, null, null),
+                    aCompaction("1.1", WORKING, 99L, 100L, null),
+                    aCompaction("1.2", SUCCEEDED, 99L, 99L, 100L)
+                ))
+            .allWorkerVersionsSince(100),
+
+        is(ImmutableList.of("1.0", "1.1", "1.2"))
+    );
+  }
+
+  @Test
+  public void testCollectWorkerVersionsSortedAndAvoidDuplicates() {
+    assertThat(CompactionMetricData.of(
+                ImmutableList.of(
+                    aCompaction("2.0", INITIATED, 1L, null, null),
+                    aCompaction("2.1", INITIATED, 1L, null, null),
+                    aCompaction("2.10", INITIATED, 1L, null, null),
+                    aCompaction("2.2", INITIATED, 1L, null, null),
+                    aCompaction("3.0", WORKING, 1L, null, null),
+                    aCompaction("1.0", INITIATED, 1L, null, null),
+                    aCompaction("1.0", WORKING, 1L, null, null)
+                ))
+            .allWorkerVersionsSince(SINCE_EPOCH),
+
+        is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0"))
+    );
+  }
+
+  private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state,
+      CompactionType type) {
+    ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state);
+    e.setId(id);
+    e.setPartitionname(partition);
+    return e;
+  }
+
+  private static ShowCompactResponseElement aCompaction(long id,
+      String initiatorId, String initiatorVersion,
+      String workerId, String workerVersion) {
+    ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", UUID.randomUUID().toString(),
+        CompactionType.MAJOR, INITIATED);
+    e.setId(id);
+
+    e.setInitiatorId(initiatorId);
+    e.setInitiatorVersion(initiatorVersion);
+
+    e.setWorkerid(workerId);
+    e.setWorkerVersion(workerVersion);
+
+    return e;
+  }
+
+  private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state,
+      CompactionType type, Long enqueuedTime, Long startTime, Long cleanerStart) {
+    ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state);
+    e.setId(id);
+    e.setPartitionname(partition);
+
+    if (enqueuedTime != null) {
+      e.setEnqueueTime(enqueuedTime);
+    }
+
+    if (startTime != null) {
+      e.setStart(startTime);
+    }
+
+    if (cleanerStart != null) {
+      e.setCleanerStart(cleanerStart);
+    }
+
+    return e;
+  }
+
+  private static ShowCompactResponseElement aCompaction(String workerVersion, String state,
+      Long enqueuedTime, Long startTime, Long endTime) {
+
+    ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state);
+    e.setWorkerVersion(workerVersion);
+
+    if (enqueuedTime != null) {
+      e.setEnqueueTime(enqueuedTime);
+    }
+
+    if (startTime != null) {
+      e.setStart(startTime);
+    }
+
+    if (endTime != null) {
+      e.setEndTime(endTime);
+    }
+
+    return e;
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java
deleted file mode 100644
index 1476f4d..0000000
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.metrics;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.hamcrest.CoreMatchers;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Collections;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-@Category(MetastoreUnitTest.class)
-public class TestMultipleWorkerVersionDetection {
-
-  private final long SINCE_EPOCH = 0L;
-
-  @Test
-  public void testCollectWorkerVersionsEmptyLists() {
-    assertThat(AcidMetricService.collectWorkerVersions(null, SINCE_EPOCH), CoreMatchers.is(Collections.emptyList()));
-    assertThat(AcidMetricService.collectWorkerVersions(Collections.emptyList(), SINCE_EPOCH),
-        CoreMatchers.is(Collections.emptyList()));
-  }
-
-  @Test
-  public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() {
-    assertThat(AcidMetricService.collectWorkerVersions(
-            ImmutableList.of(
-                showCompactResponse("DoNotShowUp", "did not initiate", 1L, 1L, 1L),
-                showCompactResponse("1.0", "initiated", 1L, 1L, 1L)),
-            SINCE_EPOCH),
-
-        CoreMatchers.is(Collections.singletonList("1.0"))
-    );
-  }
-
-  @Test
-  public void testCollectWorkerVersionsNullVersionGettingFilteredOut() {
-    assertThat(AcidMetricService.collectWorkerVersions(
-            ImmutableList.of(
-                showCompactResponse(null, "initiated", 1L, 1L, 1L)),
-            SINCE_EPOCH),
-
-        CoreMatchers.is(Collections.emptyList())
-    );
-  }
-
-  @Test
-  public void testCollectWorkerVersionsTimeThreshold() {
-    assertThat(AcidMetricService.collectWorkerVersions(
-            ImmutableList.of(
-                showCompactResponse("0.0-not-shown", "initiated", 99L, null, null),
-                showCompactResponse("0.1-not-shown", "initiated", 99L, 99L, null),
-                showCompactResponse("0.2-not-shown", "initiated", 99L, 99L, 99L),
-
-                showCompactResponse("1.0", "initiated", 100L, null, null),
-                showCompactResponse("1.1", "working", 99L, 100L, null),
-                showCompactResponse("1.2", "succeeded", 99L, 99L, 100L)
-            ),
-            100),
-
-        CoreMatchers.is(ImmutableList.of("1.0", "1.1", "1.2"))
-    );
-  }
-
-  @Test
-  public void testCollectWorkerVersionsSortedAndAvoidDuplicates() {
-    assertThat(AcidMetricService.collectWorkerVersions(
-            ImmutableList.of(
-                showCompactResponse("2.0", "initiated", 1L, null, null),
-                showCompactResponse("2.1", "initiated", 1L, null, null),
-                showCompactResponse("2.10", "initiated", 1L, null, null),
-                showCompactResponse("2.2", "initiated", 1L, null, null),
-                showCompactResponse("3.0", "working", 1L, null, null),
-                showCompactResponse("1.0", "initiated", 1L, null, null),
-                showCompactResponse("1.0", "working", 1L, null, null)
-            ),
-            SINCE_EPOCH),
-
-        CoreMatchers.is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0"))
-    );
-  }
-
-  private static ShowCompactResponseElement showCompactResponse(String workerVersion, String state,
-      Long enqueuedTime, Long startTime, Long endTime) {
-
-    ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state);
-    e.setWorkerVersion(workerVersion);
-
-    if (enqueuedTime != null) {
-      e.setEnqueueTime(enqueuedTime);
-    }
-
-    if (startTime != null) {
-      e.setStart(startTime);
-    }
-
-    if (endTime != null) {
-      e.setEndTime(endTime);
-    }
-
-    return e;
-  }
-}