You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2021/09/13 07:52:17 UTC

[hive] branch master updated: HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 256fcbe  HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)
256fcbe is described below

commit 256fcbe158938bdaf26b8adf01dcf591ca13da28
Author: Karen Coppage <kl...@apache.org>
AuthorDate: Mon Sep 13 09:52:01 2021 +0200

    HIVE-25513: Delta metrics collection may cause NPE (Karen Coppage, reviewed by Laszlo Pinter)
    
    Closes #2633.
---
 .../metrics/DeltaFilesMetricReporter.java          | 86 ++++++++++++----------
 .../ql/txn/compactor/TestDeltaFilesMetrics.java    | 28 +++++++
 2 files changed, 74 insertions(+), 40 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
index 61b6a55..311d5ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
@@ -62,7 +62,6 @@ import java.util.Date;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
@@ -225,40 +224,48 @@ public class DeltaFilesMetricReporter {
       EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf)
       throws IOException {
 
-    long baseSize = getBaseSize(dir);
-    int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec);
+    try {
+      long baseSize = getBaseSize(dir);
+      int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec);
 
-    int numDeltas = 0;
-    int numSmallDeltas = 0;
+      int numDeltas = 0;
+      int numSmallDeltas = 0;
 
-    long now = new Date().getTime();
+      long now = new Date().getTime();
 
-    for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
-      if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) {
-        numDeltas++;
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        if (now - getModificationTime(delta, dir.getFs()) >= checkThresholdInSec * 1000) {
+          numDeltas++;
 
-        long deltaSize = getDirSize(delta, dir.getFs());
-        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
-          numSmallDeltas++;
+          long deltaSize = getDirSize(delta, dir.getFs());
+          if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+            numSmallDeltas++;
+          }
         }
       }
-    }
 
-    logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);
+      logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);
 
-    String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA);
-    HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>();
-    pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass());
-    if (pathToMetadata == null) {
-      LOG.warn("Delta metrics can't be updated since the metadata is null.");
-      return;
+      String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA);
+      if (serializedMetadata == null) {
+        LOG.warn("delta.files.metrics.metadata is missing from config. Delta metrics can't be updated.");
+        return;
+      }
+      HashMap<Path, DeltaFilesMetadata> pathToMetadata = new HashMap<>();
+      pathToMetadata = SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass());
+      if (pathToMetadata == null) {
+        LOG.warn("Delta metrics can't be updated since the metadata is null.");
+        return;
+      }
+      DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath());
+      filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
+      filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats,
+          metadata, maxCacheSize);
+      filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, metadata,
+          maxCacheSize);
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
     }
-    DeltaFilesMetadata metadata = pathToMetadata.get(dir.getPath());
-    filterAndAddToDeltaFilesStats(NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
-    filterAndAddToDeltaFilesStats(NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats,
-        metadata, maxCacheSize);
-    filterAndAddToDeltaFilesStats(NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats,
-        metadata, maxCacheSize);
   }
 
   /**
@@ -342,12 +349,6 @@ public class DeltaFilesMetricReporter {
     return numObsoleteDeltas;
   }
 
-  private static String getRelPath(AcidUtils.Directory directory) {
-    return directory.getPath().getName().contains("=") ?
-      directory.getPath().getParent().getName() + Path.SEPARATOR + directory.getPath().getName() :
-      directory.getPath().getName();
-  }
-
   public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
     if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
       MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
@@ -364,19 +365,24 @@ public class DeltaFilesMetricReporter {
 
   public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
       Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
-    deltaFilesStats.forEach((type, value) ->
-        conf.set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
+    try {
+      deltaFilesStats.forEach((type, value) -> conf
+          .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
+
+    } catch (Exception e) {
+      LOG.warn("Couldn't add Delta metrics to conf object", e);
+    }
   }
 
   public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
     if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
       MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> conf.get(type.name()) != null)
-        .forEach(type ->
-            jobConf.set(type.name(), conf.get(type.name()))
-        );
+      try {
+        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null)
+            .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
+      } catch (Exception e) {
+        LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
+      }
     }
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
index ee7fbf5..e321f33 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
 import org.apache.tez.common.counters.TezCounters;
 import org.jetbrains.annotations.NotNull;
@@ -35,8 +39,10 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
@@ -174,6 +180,28 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     }}, gaugeToMap(MetricsConstants.COMPACTION_NUM_DELTAS));
   }
 
+  @Test
+  public void testMergeDeltaFilesStatsNullData() throws Exception {
+    setUpHiveConf();
+    MetricsFactory.close();
+    MetricsFactory.init(conf);
+    DeltaFilesMetricReporter.init(conf);
+
+    AcidDirectory dir = new AcidDirectory(new Path("/"), FileSystem.get(conf), null);
+    long checkThresholdInSec = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_CHECK_THRESHOLD, TimeUnit.SECONDS);
+    float deltaPctThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
+    EnumMap<DeltaFilesMetricReporter.DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats =
+        new EnumMap<>(DeltaFilesMetricReporter.DeltaFilesMetricType.class);
+
+    //conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA) will not have a value assigned; this test checks for an NPE
+    DeltaFilesMetricReporter.mergeDeltaFilesStats(dir,checkThresholdInSec, deltaPctThreshold, deltasThreshold,
+        obsoleteDeltasThreshold, maxCacheSize, deltaFilesStats, conf);
+  }
+
   static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) {
     Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected,
         equivalent(expected, actual));