You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2021/09/13 07:52:17 UTC
[hive] branch master updated: HIVE-25513: Delta metrics collection
may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 256fcbe HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)
256fcbe is described below
commit 256fcbe158938bdaf26b8adf01dcf591ca13da28
Author: Karen Coppage <kl...@apache.org>
AuthorDate: Mon Sep 13 09:52:01 2021 +0200
HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)
Closes #2633.
---
.../metrics/DeltaFilesMetricReporter.java | 86 ++++++++++++----------
.../ql/txn/compactor/TestDeltaFilesMetrics.java | 28 +++++++
2 files changed, 74 insertions(+), 40 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
index 61b6a55..311d5ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
@@ -62,7 +62,6 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -225,40 +224,48 @@ public class DeltaFilesMetricReporter {
EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf)
throws IOException {
- long baseSize = getBaseSize(dir);
- int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec);
+ try {
+ long baseSize = getBaseSize(dir);
+ int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec);
- int numDeltas = 0;
- int numSmallDeltas = 0;
+ int numDeltas = 0;
+ int numSmallDeltas = 0;
- long now = new Date().getTime();
+ long now = new Date().getTime();
- for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
- if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) {
- numDeltas++;
+ for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+ if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) {
+ numDeltas++;
- long deltaSize = getDirSize(delta, dir.getFs());
- if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
- numSmallDeltas++;
+ long deltaSize = getDirSize(delta, dir.getFs());
+ if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+ numSmallDeltas++;
+ }
}
}
- }
- logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);
+ logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);
- String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA);
- HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>();
- pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass());
- if (pathToMetadata == null) {
- LOG.warn("Delta metrics can't be updated since the metadata is null.");
- return;
+ String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA);
+ if (serializedMetadata == null) {
+ LOG.warn("delta.files.metrics.metadata is missing from config. Delta metrics can't be updated.");
+ return;
+ }
+ HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>();
+ pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass());
+ if (pathToMetadata == null) {
+ LOG.warn("Delta metrics can't be updated since the metadata is null.");
+ return;
+ }
+ DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath());
+ filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats,
+ metadata, maxCacheSize);
+ filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, metadata,
+ maxCacheSize);
+ } catch (Throwable t) {
+ LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
}
- DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath());
- filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
- filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats,
- metadata, maxCacheSize);
- filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats,
- metadata, maxCacheSize);
}
/**
@@ -342,12 +349,6 @@ public class DeltaFilesMetricReporter {
return numObsoleteDeltas;
}
- private static String getRelPath(AcidUtils.Directory directory) {
- return directory.getPath().getName().contains("=") ?
- directory.getPath().getParent().getName() + Path.SEPARATOR + directory.getPath().getName() :
- directory.getPath().getName();
- }
-
public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
@@ -364,19 +365,24 @@ public class DeltaFilesMetricReporter {
public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
- deltaFilesStats.forEach((type, value) ->
- conf.set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
+ try {
+ deltaFilesStats.forEach((type, value) -> conf
+ .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
+
+ } catch (Exception e) {
+ LOG.warn("Couldn't add Delta metrics to conf object", e);
+ }
}
public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
- Arrays.stream(DeltaFilesMetricType.values())
- .filter(type -> conf.get(type.name()) != null)
- .forEach(type ->
- jobConf.set(type.name(), conf.get(type.name()))
- );
+ try {
+ Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null)
+ .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
+ } catch (Exception e) {
+ LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
+ }
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
index ee7fbf5..e321f33 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.apache.tez.common.counters.TezCounters;
import org.jetbrains.annotations.NotNull;
@@ -35,8 +39,10 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
@@ -174,6 +180,28 @@ public class TestDeltaFilesMetrics extends CompactorTest {
}}, gaugeToMap(MetricsConstants.COMPACTION_NUM_DELTAS));
}
+ @Test
+ public void testMergeDeltaFilesStatsNullData() throws Exception {
+ setUpHiveConf();
+ MetricsFactory.close();
+ MetricsFactory.init(conf);
+ DeltaFilesMetricReporter.init(conf);
+
+ AcidDirectory dir = new AcidDirectory(new Path("/"), FileSystem.get(conf), null);
+ long checkThresholdInSec = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_CHECK_THRESHOLD, TimeUnit.SECONDS);
+ float deltaPctThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_PCT_THRESHOLD);
+ int deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
+ int obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+ int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
+ EnumMap<DeltaFilesMetricReporter.DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats =
+ new EnumMap<>(DeltaFilesMetricReporter.DeltaFilesMetricType.class);
+
+ //conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA) will not have a value assigned; this test checks for an NPE
+ DeltaFilesMetricReporter.mergeDeltaFilesStats(dir,checkThresholdInSec, deltaPctThreshold, deltasThreshold,
+ obsoleteDeltasThreshold, maxCacheSize, deltaFilesStats, conf);
+ }
+
static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) {
Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected,
equivalent(expected, actual));