You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2022/02/07 08:38:05 UTC
[hive] branch master updated: HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 29c0e81 HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)
29c0e81 is described below
commit 29c0e81d938dbc7d56a600476c06ed9a6cba6298
Author: Viktor Csomor <cs...@gmail.com>
AuthorDate: Mon Feb 7 09:37:53 2022 +0100
HIVE-25926: Move all logging from AcidMetricService to AcidMetricLogger (Viktor Csomor, reviewed by Karen Coppage)
The common logic required by the `AcidMetricLogger` and the `AcidMetricService` had been extracted to a package-private component `AcidMetricData`.
This change enabled to move the logging from AcidMetricService to AcidMetricLogger.
Added methods:
- logMultipleWorkerVersions
- logFailedCompactionsPercentage
- logOldestInitiatorAge
Tests added.
Closes #2995.
---
.../ql/txn/compactor/TestCompactionMetrics.java | 1 -
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +-
.../hive/metastore/metrics/AcidMetricLogger.java | 52 ++-
.../hive/metastore/metrics/AcidMetricService.java | 149 ++------
.../metastore/metrics/CompactionMetricData.java | 210 +++++++++++
.../metrics/TestCompactionMetricData.java | 390 +++++++++++++++++++++
.../TestMultipleWorkerVersionDetection.java | 123 -------
7 files changed, 678 insertions(+), 250 deletions(-)
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index eea0c3b..07a3212 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index aa6b18d..ae23b2f 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -608,8 +608,7 @@ public class MetastoreConf {
"hive.metastore.compactor.worker.detect.multiple.versions.threshold", 24, TimeUnit.HOURS,
"Defines a time-window in hours from the current time backwards\n," +
"in which a warning is being raised if multiple worker version are detected.\n" +
- "The setting has no effect if the metastore.metrics.enabled is disabled \n" +
- "or the metastore.acidmetrics.thread.on is turned off."),
+ "The setting has no effect if the metastore.compactor.acid.metrics.logger.frequency is 0."),
COMPACTOR_MINOR_STATS_COMPRESSION(
"metastore.compactor.enable.stats.compression",
"metastore.compactor.enable.stats.compression", true,
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
index e8ad33b..35450a4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricLogger.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.metrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
import org.apache.hadoop.hive.metastore.txn.MetricsInfo;
@@ -56,7 +58,7 @@ public class AcidMetricLogger implements MetastoreTaskThread {
public void run() {
try {
logDbMetrics();
- logDeltaMetrics();
+ logMetrics();
} catch (MetaException e) {
LOG.warn("Caught exception while trying to log acid metrics data.", e);
}
@@ -74,6 +76,17 @@ public class AcidMetricLogger implements MetastoreTaskThread {
return conf;
}
+ private void logMetrics() throws MetaException {
+ ShowCompactResponse response = txnHandler.showCompact(new ShowCompactRequest());
+ CompactionMetricData metricData = CompactionMetricData.of(response.getCompacts());
+
+ logMultipleWorkerVersions(metricData);
+ logFailedCompactionsPercentage(metricData);
+ logOldestInitiatorAge(metricData);
+
+ logDeltaMetrics();
+ }
+
private void logDeltaMetrics() throws MetaException {
List<CompactionMetricsData> deltas = txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
deltas.stream().filter(d -> d.getMetricType() == NUM_DELTAS).forEach(d -> LOG.warn(
@@ -89,6 +102,42 @@ public class AcidMetricLogger implements MetastoreTaskThread {
AcidMetricService.getDeltaCountKey(d.getDbName(), d.getTblName(), d.getPartitionName()), d.getMetricValue())));
}
+ private void logOldestInitiatorAge(CompactionMetricData metricData) {
+ int oldestInitiatorAge = (int) ((System.currentTimeMillis() - metricData.getOldestEnqueueTime()) / 1000L);
+ String oldestInitiatorMessage = "Found compaction entry in compaction queue with an age of {} seconds. " +
+ "Consider increasing the number of worker threads.";
+ long oldestInitiatedWarningThreshold = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
+ TimeUnit.SECONDS);
+ long oldestInitiatedErrorThreshold = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
+ TimeUnit.SECONDS);
+ if (oldestInitiatorAge >= oldestInitiatedErrorThreshold) {
+ LOG.error(oldestInitiatorMessage, oldestInitiatorAge);
+ } else if (oldestInitiatorAge >= oldestInitiatedWarningThreshold) {
+ LOG.warn(oldestInitiatorMessage, oldestInitiatorAge);
+ }
+ }
+
+ private void logMultipleWorkerVersions(CompactionMetricData metricData) {
+ long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS);
+ List<String> versions = metricData
+ .allWorkerVersionsSince(System.currentTimeMillis() - workerVersionThresholdInMillis);
+
+ if (versions.size() > 1) {
+ LOG.warn("Multiple Compaction Worker versions detected: {}", versions);
+ }
+ }
+
+ private void logFailedCompactionsPercentage(CompactionMetricData metricData) {
+ Double failedCompactionPercentage = metricData.getFailedCompactionPercentage();
+ if (failedCompactionPercentage != null &&
+ (failedCompactionPercentage >=
+ MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) {
+ LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions.");
+ }
+ }
private void logDbMetrics() throws MetaException {
MetricsInfo metrics = txnHandler.getMetricsInfo();
if (metrics.getTxnToWriteIdCount() >= MetastoreConf.getIntVar(conf,
@@ -170,4 +219,3 @@ public class AcidMetricLogger implements MetastoreTaskThread {
}
}
}
-
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
index 898efb2..8ce09ea 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.metastore.metrics;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.AcidConstants;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
import org.apache.hadoop.hive.metastore.txn.MetricsInfo;
@@ -46,15 +44,13 @@ import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION;
import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATORS;
import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS;
@@ -121,9 +117,7 @@ public class AcidMetricService implements MetastoreTaskThread {
}
long startedAt = System.currentTimeMillis();
try {
- ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- detectMultipleWorkerVersions(currentCompactions);
- updateMetrics(currentCompactions);
+ updateMetrics();
updateDeltaMetrics();
} catch (Exception ex) {
LOG.error("Caught exception in AcidMetricService loop", ex);
@@ -296,38 +290,12 @@ public class AcidMetricService implements MetastoreTaskThread {
}
}
- private void detectMultipleWorkerVersions(ShowCompactResponse currentCompactions) {
- long workerVersionThresholdInMillis = MetastoreConf.getTimeVar(conf,
- MetastoreConf.ConfVars.COMPACTOR_WORKER_DETECT_MULTIPLE_VERSION_THRESHOLD, TimeUnit.MILLISECONDS);
- long since = System.currentTimeMillis() - workerVersionThresholdInMillis;
-
- List<String> versions = collectWorkerVersions(currentCompactions.getCompacts(), since);
- if (versions.size() > 1) {
- LOG.warn("Multiple Compaction Worker versions detected: {}", versions);
- }
- }
-
- private void updateMetrics(ShowCompactResponse currentCompactions) throws MetaException {
+ private void updateMetrics() throws MetaException {
+ ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
updateMetricsFromShowCompact(currentCompactions, conf);
updateDBMetrics();
}
- @VisibleForTesting
- public static List<String> collectWorkerVersions(List<ShowCompactResponseElement> currentCompacts, long since) {
- return Optional.ofNullable(currentCompacts)
- .orElseGet(ImmutableList::of)
- .stream()
- .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since))
- || (comp.isSetStart() && (comp.getStart() >= since))
- || (comp.isSetEndTime() && (comp.getEndTime() >= since)))
- .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState()))
- .map(ShowCompactResponseElement::getWorkerVersion)
- .filter(Objects::nonNull)
- .distinct()
- .sorted()
- .collect(Collectors.toList());
- }
-
private void updateDBMetrics() throws MetaException {
MetricsInfo metrics = txnHandler.getMetricsInfo();
Metrics.getOrCreateGauge(NUM_TXN_TO_WRITEID).set(metrics.getTxnToWriteIdCount());
@@ -347,112 +315,49 @@ public class AcidMetricService implements MetastoreTaskThread {
Metrics.getOrCreateGauge(OLDEST_READY_FOR_CLEANING_AGE).set(metrics.getOldestReadyForCleaningAge());
}
-
-
@VisibleForTesting
public static void updateMetricsFromShowCompact(ShowCompactResponse showCompactResponse, Configuration conf) {
- Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
- long oldestEnqueueTime = Long.MAX_VALUE;
- long oldestWorkingTime = Long.MAX_VALUE;
- long oldestCleaningTime = Long.MAX_VALUE;
-
- // Get the last compaction for each db/table/partition
- for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) {
- String key = element.getDbname() + "/" + element.getTablename() +
- (element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
-
- // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id
- lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old));
-
- // find the oldest elements with initiated and working states
- String state = element.getState();
- if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) {
- oldestEnqueueTime = element.getEnqueueTime();
- }
-
- if (element.isSetStart()) {
- if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) {
- oldestWorkingTime = element.getStart();
- }
- }
-
- if (element.isSetCleanerStart()) {
- if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) {
- oldestCleaningTime = element.getCleanerStart();
- }
- }
- }
+ CompactionMetricData metricData = CompactionMetricData.of(showCompactResponse.getCompacts());
// Get the current count for each state
- Map<String, Long> counts = lastElements.values().stream()
- .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting()));
+ Map<String, Long> counts = metricData.getStateCount();
// Update metrics
for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
String key = COMPACTION_STATUS_PREFIX + replaceWhitespace(TxnStore.COMPACTION_STATES[i]);
Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
if (count != null) {
- Metrics.getOrCreateGauge(key).set(count.intValue());
+ Metrics.getOrCreateGauge(key)
+ .set(count.intValue());
} else {
- Metrics.getOrCreateGauge(key).set(0);
+ Metrics.getOrCreateGauge(key)
+ .set(0);
}
}
- Long numFailedComp = counts.get(TxnStore.FAILED_RESPONSE);
- Long numNotInitiatedComp = counts.get(TxnStore.DID_NOT_INITIATE_RESPONSE);
- Long numSucceededComp = counts.get(TxnStore.SUCCEEDED_RESPONSE);
- if (numFailedComp != null && numNotInitiatedComp != null && numSucceededComp != null &&
- ((numFailedComp + numNotInitiatedComp) / (numFailedComp + numNotInitiatedComp + numSucceededComp) >
- MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD))) {
- LOG.warn("Many compactions are failing. Check root cause of failed/not initiated compactions.");
- }
+ updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, metricData.getOldestEnqueueTime(), conf);
+ updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, metricData.getOldestWorkingTime(), conf);
+ updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, metricData.getOldestCleaningTime(), conf);
- updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, oldestEnqueueTime, conf,
- "Found compaction entry in compaction queue with an age of {} seconds. " +
- "Consider increasing the number of worker threads.",
- MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
- MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR);
- updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, oldestWorkingTime, conf);
- updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, oldestCleaningTime, conf);
-
- long initiatorsCount = lastElements.values().stream()
- //manually initiated compactions don't count
- .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId())))
- .map(e -> getHostFromId(e.getInitiatorId())).distinct().filter(e -> !NO_VAL.equals(e)).count();
- Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS).set((int) initiatorsCount);
- long workersCount = lastElements.values().stream()
- .map(e -> getHostFromId(e.getWorkerid())).distinct().filter(e -> !NO_VAL.equals(e)).count();
- Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS).set((int) workersCount);
-
- long initiatorVersionsCount = lastElements.values().stream()
- .map(ShowCompactResponseElement::getInitiatorVersion).distinct().filter(Objects::nonNull).count();
- Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS).set((int) initiatorVersionsCount);
- long workerVersionsCount = lastElements.values().stream()
- .map(ShowCompactResponseElement::getWorkerVersion).distinct().filter(Objects::nonNull).count();
- Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int) workerVersionsCount);
- }
+ Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATORS)
+ .set((int) metricData.getInitiatorsCount());
+ Metrics.getOrCreateGauge(COMPACTION_NUM_WORKERS)
+ .set((int) metricData.getWorkersCount());
- private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf) {
- updateOldestCompactionMetric(metricName, oldestTime, conf, null, null, null);
+ Metrics.getOrCreateGauge(COMPACTION_NUM_INITIATOR_VERSIONS)
+ .set((int) metricData.getInitiatorVersionsCount());
+ Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS)
+ .set((int) metricData.getWorkerVersionsCount());
}
- private static void updateOldestCompactionMetric(String metricName, long oldestTime, Configuration conf,
- String logMessage, MetastoreConf.ConfVars warningThreshold, MetastoreConf.ConfVars errorThreshold) {
- if (oldestTime == Long.MAX_VALUE) {
+ private static void updateOldestCompactionMetric(String metricName, Long oldestTime, Configuration conf) {
+ if (oldestTime == null) {
Metrics.getOrCreateGauge(metricName)
.set(0);
- return;
- }
-
- int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
- Metrics.getOrCreateGauge(metricName)
- .set(oldestAge);
- if (logMessage != null) {
- if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold, TimeUnit.SECONDS)) {
- LOG.error(logMessage, oldestAge);
- } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold, TimeUnit.SECONDS)) {
- LOG.warn(logMessage, oldestAge);
- }
+ } else {
+ int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
+ Metrics.getOrCreateGauge(metricName)
+ .set(oldestAge);
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java
new file mode 100644
index 0000000..81eee8d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/CompactionMetricData.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.NO_VAL;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getHostFromId;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getThreadIdFromId;
+
+final class CompactionMetricData {
+
+ private static final Long OLDEST_TIME_NO_VALUE = Long.MAX_VALUE;
+
+ private final List<ShowCompactResponseElement> compacts;
+
+ private long oldestEnqueueTime;
+ private long oldestWorkingTime;
+ private long oldestCleaningTime;
+
+ private Map<String, Long> stateCount;
+
+ private Double failedCompactionPercentage;
+
+ private long initiatorsCount;
+ private long initiatorVersionsCount;
+ private long workersCount;
+ private long workerVersionsCount;
+
+ private CompactionMetricData(List<ShowCompactResponseElement> compacts) {
+ this.compacts = compacts;
+ }
+
+ static CompactionMetricData of(List<ShowCompactResponseElement> compacts) {
+ CompactionMetricData data = new CompactionMetricData(Optional.ofNullable(compacts)
+ .orElseGet(ImmutableList::of));
+ data.init();
+ return data;
+ }
+
+ private void init() {
+ final Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
+
+ oldestEnqueueTime = OLDEST_TIME_NO_VALUE;
+ oldestWorkingTime = OLDEST_TIME_NO_VALUE;
+ oldestCleaningTime = OLDEST_TIME_NO_VALUE;
+ for (ShowCompactResponseElement element : compacts) {
+ final String key = element.getDbname() + "/" + element.getTablename() +
+ (element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
+
+ // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id
+ lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old));
+
+ // find the oldest elements with initiated and working states
+ String state = element.getState();
+ if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > element.getEnqueueTime())) {
+ oldestEnqueueTime = element.getEnqueueTime();
+ }
+
+ if (element.isSetStart()) {
+ if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > element.getStart())) {
+ oldestWorkingTime = element.getStart();
+ }
+ }
+
+ if (element.isSetCleanerStart()) {
+ if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > element.getCleanerStart())) {
+ oldestCleaningTime = element.getCleanerStart();
+ }
+ }
+ }
+
+ stateCount = lastElements
+ .values()
+ .stream()
+ .collect(Collectors.groupingBy(ShowCompactResponseElement::getState, Collectors.counting()));
+
+ failedCompactionPercentage = calculateFailedPercentage(stateCount);
+
+ initiatorsCount = lastElements.values()
+ .stream()
+ //manually initiated compactions don't count
+ .filter(e -> !MANUALLY_INITIATED_COMPACTION.equals(getThreadIdFromId(e.getInitiatorId())))
+ .map(e -> getHostFromId(e.getInitiatorId()))
+ .filter(e -> !NO_VAL.equals(e))
+ .distinct()
+ .count();
+ initiatorVersionsCount = lastElements.values()
+ .stream()
+ .map(ShowCompactResponseElement::getInitiatorVersion)
+ .filter(Objects::nonNull)
+ .distinct()
+ .count();
+
+ workersCount = lastElements.values()
+ .stream()
+ .map(e -> getHostFromId(e.getWorkerid()))
+ .filter(e -> !NO_VAL.equals(e))
+ .distinct()
+ .count();
+ workerVersionsCount = lastElements.values()
+ .stream()
+ .map(ShowCompactResponseElement::getWorkerVersion)
+ .filter(Objects::nonNull)
+ .distinct()
+ .count();
+ }
+
+ List<String> allWorkerVersionsSince(long since) {
+ return compacts.stream()
+ .filter(comp -> (comp.isSetEnqueueTime() && (comp.getEnqueueTime() >= since))
+ || (comp.isSetStart() && (comp.getStart() >= since))
+ || (comp.isSetEndTime() && (comp.getEndTime() >= since)))
+ .filter(comp -> !TxnStore.DID_NOT_INITIATE_RESPONSE.equals(comp.getState()))
+ .map(ShowCompactResponseElement::getWorkerVersion)
+ .filter(Objects::nonNull)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ Map<String, Long> getStateCount() {
+ return new HashMap<>(stateCount);
+ }
+
+ Long getOldestEnqueueTime() {
+ return nullIfNotSet(oldestEnqueueTime);
+ }
+
+ Long getOldestWorkingTime() {
+ return nullIfNotSet(oldestWorkingTime);
+ }
+
+ Long getOldestCleaningTime() {
+ return nullIfNotSet(oldestCleaningTime);
+ }
+
+ Double getFailedCompactionPercentage() {
+ return failedCompactionPercentage;
+ }
+
+ long getInitiatorsCount() {
+ return initiatorsCount;
+ }
+
+ long getInitiatorVersionsCount() {
+ return initiatorVersionsCount;
+ }
+
+ long getWorkersCount() {
+ return workersCount;
+ }
+
+ long getWorkerVersionsCount() {
+ return workerVersionsCount;
+ }
+
+ private static Long nullIfNotSet(long value) {
+ if (value == OLDEST_TIME_NO_VALUE) {
+ return null;
+ }
+ return value;
+ }
+
+ private static Double calculateFailedPercentage(Map<String, Long> stateCount) {
+ long failed = unwrapToPrimitive(stateCount.get(TxnStore.FAILED_RESPONSE));
+ long notInitiated = unwrapToPrimitive(stateCount.get(TxnStore.DID_NOT_INITIATE_RESPONSE));
+ long succeeded = unwrapToPrimitive(stateCount.get(TxnStore.SUCCEEDED_RESPONSE));
+
+ long denominator = failed + notInitiated + succeeded;
+ if (denominator > 0) {
+ long numerator = failed + notInitiated;
+ return Long.valueOf(numerator).doubleValue() / Long.valueOf(denominator).doubleValue();
+ }
+
+ return null;
+ }
+
+ private static long unwrapToPrimitive(Long value) {
+ if (value == null) {
+ return 0L;
+ }
+ return value;
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java
new file mode 100644
index 0000000..aa603bd
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestCompactionMetricData.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static java.util.Collections.emptyMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(MetastoreUnitTest.class)
+public class TestCompactionMetricData {
+
+ private static final long SINCE_EPOCH = 0L;
+
+ private static final String INITIATED = "initiated";
+ private static final String WORKING = "working";
+ private static final String READY_FOR_CLEANING = "ready for cleaning";
+ private static final String DID_NOT_INITIATE = "did not initiate";
+ private static final String SUCCEEDED = "succeeded";
+ private static final String FAILED = "failed";
+
+ @Test
+ public void testStateCountsCountedCorrectly() {
+ assertThat(
+ CompactionMetricData.of(null)
+ .getStateCount(),
+ is(emptyMap()));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR),
+ aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR),
+ aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR),
+ aCompaction(5, "t2", "part1", WORKING, CompactionType.MAJOR),
+ aCompaction(6, "t2", "part2", WORKING, CompactionType.MAJOR),
+ aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR)
+ ))
+ .getStateCount(),
+ is(ImmutableMap.of(
+ INITIATED, 1L,
+ WORKING, 2L,
+ READY_FOR_CLEANING, 1L
+ )));
+ }
+
+ @Test
+ public void testOldestEnqueuedValueCalculatedCorrectly() {
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null)
+ ))
+ .getOldestEnqueueTime(),
+ nullValue());
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, 150L, null, null),
+ aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, 100L, null, null),
+ aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, 90L, null, null),
+ aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, 50L, null, null),
+ aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, 300L, null, null)
+ ))
+ .getOldestEnqueueTime(),
+ is(100L));
+ }
+
+ @Test
+ public void testOldestWorkingValueCalculatedCorrectly() {
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, 90L, null)
+ ))
+ .getOldestWorkingTime(),
+ nullValue());
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, 150L, null),
+ aCompaction(2, "t1", null, INITIATED, CompactionType.MAJOR, null, 100L, null),
+ aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, 90L, null),
+ aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, 70L, null),
+ aCompaction(6, "t2", "part2", DID_NOT_INITIATE, CompactionType.MAJOR, null, 50L, null),
+ aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, 300L, null)
+ ))
+ .getOldestWorkingTime(),
+ is(70L));
+ }
+
+ @Test
+ public void testOldestCleaningValueCalculatedCorrectly() {
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(3, "t2", "part1", INITIATED, CompactionType.MINOR, null, null, 90L)
+ ))
+ .getOldestCleaningTime(),
+ nullValue());
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", null, INITIATED, CompactionType.MINOR, null, null, 150L),
+ aCompaction(2, "t1", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 100L),
+ aCompaction(4, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 90L),
+ aCompaction(3, "t2", "part1", WORKING, CompactionType.MINOR, null, null, 70L),
+ aCompaction(7, "t3", null, READY_FOR_CLEANING, CompactionType.MAJOR, null, null, 300L)
+ ))
+ .getOldestCleaningTime(),
+ is(100L));
+ }
+
+ @Test
+ public void testFailedPercentageCalculatedCorrectly() {
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of())
+ .getFailedCompactionPercentage(),
+ nullValue());
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(aCompaction(1, "t1", "p1", SUCCEEDED, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(0.0D));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+ aCompaction(2, "t2", "p1", SUCCEEDED, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(0.5D));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+ aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR),
+ aCompaction(3, "t3", "p1", SUCCEEDED, CompactionType.MINOR),
+ aCompaction(4, "t4", "p1", SUCCEEDED, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(0.5D));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR),
+ aCompaction(2, "t2", "p1", DID_NOT_INITIATE, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(1.0D));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", "p1", FAILED, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(1.0D));
+
+ assertThat(
+ CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, "t1", "p1", DID_NOT_INITIATE, CompactionType.MINOR)))
+ .getFailedCompactionPercentage(),
+ is(1.0D));
+ }
+
+ @Test
+ public void testInitiatorCountCalculatedCorrectly() {
+ assertThat(CompactionMetricData.of(Collections.emptyList())
+ .getInitiatorsCount(),
+ is(0L));
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, null, null, null, (String) null),
+ aCompaction(2, "host1-initiator", null, null, (String) null),
+ aCompaction(3, "host2-initiator-manual", null, null, (String) null),
+ aCompaction(4, "host3-initiator", null, null, (String) null)))
+ .getInitiatorsCount(),
+ is(2L));
+ }
+
+ @Test
+ public void testInitiatorVersionsCalculatedCorrectly() {
+ assertThat(CompactionMetricData.of(Collections.emptyList())
+ .getInitiatorVersionsCount(),
+ is(0L));
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, null, "1.0", null, (String) null),
+ aCompaction(2, null, "3.0", null, (String) null),
+ aCompaction(3, null, "4.0", null, (String) null),
+ aCompaction(4, null, null, null, (String) null),
+ aCompaction(5, null, "4.0", null, (String) null)))
+ .getInitiatorVersionsCount(),
+ is(3L));
+ }
+
+ @Test
+ public void testWorkerCountCalculatedCorrectly() {
+ assertThat(CompactionMetricData.of(Collections.emptyList())
+ .getWorkersCount(),
+ is(0L));
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, null, null, null, "4.0"),
+ aCompaction(2, null, null, "host1-worker", "4.0"),
+ aCompaction(3, null, null, "host2-worker", "4.0")))
+ .getWorkersCount(),
+ is(2L));
+ }
+
+ @Test
+ public void testWorkerVersionsCalculatedCorrectly() {
+ assertThat(CompactionMetricData.of(Collections.emptyList())
+ .getWorkerVersionsCount(),
+ is(0L));
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(1, null, null, null, "1.0"),
+ aCompaction(2, null, null, null, "3.0"),
+ aCompaction(3, null, null, null, "4.0"),
+ aCompaction(4, null, null, null, (String) null),
+ aCompaction(5, null, null, null, "4.0")))
+ .getWorkerVersionsCount(),
+ is(3L));
+ }
+
+ @Test
+ public void testCollectWorkerVersionsEmptyLists() {
+ assertThat(CompactionMetricData.of(Collections.emptyList()).allWorkerVersionsSince(SINCE_EPOCH),
+
+ is(Collections.emptyList()));
+ }
+
+ @Test
+ public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() {
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction("DoNotShowUp", DID_NOT_INITIATE, 1L, 1L, 1L),
+ aCompaction("1.0", INITIATED, 1L, 1L, 1L)))
+ .allWorkerVersionsSince(SINCE_EPOCH),
+
+ is(Collections.singletonList("1.0"))
+ );
+ }
+
+ @Test
+ public void testCollectWorkerVersionsNullVersionGettingFilteredOut() {
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction(null, INITIATED, 1L, 1L, 1L)))
+ .allWorkerVersionsSince(SINCE_EPOCH),
+
+ is(Collections.emptyList())
+ );
+ }
+
+ @Test
+ public void testCollectWorkerVersionsTimeThreshold() {
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction("0.0-not-shown", INITIATED, 99L, null, null),
+ aCompaction("0.1-not-shown", INITIATED, 99L, 99L, null),
+ aCompaction("0.2-not-shown", INITIATED, 99L, 99L, 99L),
+
+ aCompaction("1.0", INITIATED, 100L, null, null),
+ aCompaction("1.1", WORKING, 99L, 100L, null),
+ aCompaction("1.2", SUCCEEDED, 99L, 99L, 100L)
+ ))
+ .allWorkerVersionsSince(100),
+
+ is(ImmutableList.of("1.0", "1.1", "1.2"))
+ );
+ }
+
+ @Test
+ public void testCollectWorkerVersionsSortedAndAvoidDuplicates() {
+ assertThat(CompactionMetricData.of(
+ ImmutableList.of(
+ aCompaction("2.0", INITIATED, 1L, null, null),
+ aCompaction("2.1", INITIATED, 1L, null, null),
+ aCompaction("2.10", INITIATED, 1L, null, null),
+ aCompaction("2.2", INITIATED, 1L, null, null),
+ aCompaction("3.0", WORKING, 1L, null, null),
+ aCompaction("1.0", INITIATED, 1L, null, null),
+ aCompaction("1.0", WORKING, 1L, null, null)
+ ))
+ .allWorkerVersionsSince(SINCE_EPOCH),
+
+ is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0"))
+ );
+ }
+
+ private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state,
+ CompactionType type) {
+ ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state);
+ e.setId(id);
+ e.setPartitionname(partition);
+ return e;
+ }
+
+ private static ShowCompactResponseElement aCompaction(long id,
+ String initiatorId, String initiatorVersion,
+ String workerId, String workerVersion) {
+ ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", UUID.randomUUID().toString(),
+ CompactionType.MAJOR, INITIATED);
+ e.setId(id);
+
+ e.setInitiatorId(initiatorId);
+ e.setInitiatorVersion(initiatorVersion);
+
+ e.setWorkerid(workerId);
+ e.setWorkerVersion(workerVersion);
+
+ return e;
+ }
+
+ private static ShowCompactResponseElement aCompaction(long id, String table, String partition, String state,
+ CompactionType type, Long enqueuedTime, Long startTime, Long cleanerStart) {
+ ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", table, type, state);
+ e.setId(id);
+ e.setPartitionname(partition);
+
+ if (enqueuedTime != null) {
+ e.setEnqueueTime(enqueuedTime);
+ }
+
+ if (startTime != null) {
+ e.setStart(startTime);
+ }
+
+ if (cleanerStart != null) {
+ e.setCleanerStart(cleanerStart);
+ }
+
+ return e;
+ }
+
+ private static ShowCompactResponseElement aCompaction(String workerVersion, String state,
+ Long enqueuedTime, Long startTime, Long endTime) {
+
+ ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state);
+ e.setWorkerVersion(workerVersion);
+
+ if (enqueuedTime != null) {
+ e.setEnqueueTime(enqueuedTime);
+ }
+
+ if (startTime != null) {
+ e.setStart(startTime);
+ }
+
+ if (endTime != null) {
+ e.setEndTime(endTime);
+ }
+
+ return e;
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java
deleted file mode 100644
index 1476f4d..0000000
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/metrics/TestMultipleWorkerVersionDetection.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.metrics;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.hamcrest.CoreMatchers;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Collections;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-@Category(MetastoreUnitTest.class)
-public class TestMultipleWorkerVersionDetection {
-
- private final long SINCE_EPOCH = 0L;
-
- @Test
- public void testCollectWorkerVersionsEmptyLists() {
- assertThat(AcidMetricService.collectWorkerVersions(null, SINCE_EPOCH), CoreMatchers.is(Collections.emptyList()));
- assertThat(AcidMetricService.collectWorkerVersions(Collections.emptyList(), SINCE_EPOCH),
- CoreMatchers.is(Collections.emptyList()));
- }
-
- @Test
- public void testCollectWorkerVersionsDidNotInitiateGettingFilteredOut() {
- assertThat(AcidMetricService.collectWorkerVersions(
- ImmutableList.of(
- showCompactResponse("DoNotShowUp", "did not initiate", 1L, 1L, 1L),
- showCompactResponse("1.0", "initiated", 1L, 1L, 1L)),
- SINCE_EPOCH),
-
- CoreMatchers.is(Collections.singletonList("1.0"))
- );
- }
-
- @Test
- public void testCollectWorkerVersionsNullVersionGettingFilteredOut() {
- assertThat(AcidMetricService.collectWorkerVersions(
- ImmutableList.of(
- showCompactResponse(null, "initiated", 1L, 1L, 1L)),
- SINCE_EPOCH),
-
- CoreMatchers.is(Collections.emptyList())
- );
- }
-
- @Test
- public void testCollectWorkerVersionsTimeThreshold() {
- assertThat(AcidMetricService.collectWorkerVersions(
- ImmutableList.of(
- showCompactResponse("0.0-not-shown", "initiated", 99L, null, null),
- showCompactResponse("0.1-not-shown", "initiated", 99L, 99L, null),
- showCompactResponse("0.2-not-shown", "initiated", 99L, 99L, 99L),
-
- showCompactResponse("1.0", "initiated", 100L, null, null),
- showCompactResponse("1.1", "working", 99L, 100L, null),
- showCompactResponse("1.2", "succeeded", 99L, 99L, 100L)
- ),
- 100),
-
- CoreMatchers.is(ImmutableList.of("1.0", "1.1", "1.2"))
- );
- }
-
- @Test
- public void testCollectWorkerVersionsSortedAndAvoidDuplicates() {
- assertThat(AcidMetricService.collectWorkerVersions(
- ImmutableList.of(
- showCompactResponse("2.0", "initiated", 1L, null, null),
- showCompactResponse("2.1", "initiated", 1L, null, null),
- showCompactResponse("2.10", "initiated", 1L, null, null),
- showCompactResponse("2.2", "initiated", 1L, null, null),
- showCompactResponse("3.0", "working", 1L, null, null),
- showCompactResponse("1.0", "initiated", 1L, null, null),
- showCompactResponse("1.0", "working", 1L, null, null)
- ),
- SINCE_EPOCH),
-
- CoreMatchers.is(ImmutableList.of("1.0", "2.0", "2.1", "2.10", "2.2", "3.0"))
- );
- }
-
- private static ShowCompactResponseElement showCompactResponse(String workerVersion, String state,
- Long enqueuedTime, Long startTime, Long endTime) {
-
- ShowCompactResponseElement e = new ShowCompactResponseElement("db_name", "table_name", CompactionType.MINOR, state);
- e.setWorkerVersion(workerVersion);
-
- if (enqueuedTime != null) {
- e.setEnqueueTime(enqueuedTime);
- }
-
- if (startTime != null) {
- e.setStart(startTime);
- }
-
- if (endTime != null) {
- e.setEndTime(endTime);
- }
-
- return e;
- }
-}