You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2019/10/28 16:20:10 UTC

[GitHub] [accumulo] EdColeman commented on a change in pull request #1391: Include additional details in FATE metrics reported.

EdColeman commented on a change in pull request #1391: Include additional details in FATE metrics reported.
URL: https://github.com/apache/accumulo/pull/1391#discussion_r339659950
 
 

 ##########
 File path: server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
 ##########
 @@ -16,61 +16,234 @@
  */
 package org.apache.accumulo.master.metrics.fate;
 
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.ReadOnlyTStore;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.master.metrics.MasterMetrics;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FateMetrics extends MasterMetrics {
 
+  private static final Logger log = LoggerFactory.getLogger(FateMetrics.class);
+
   // limit calls to update fate counters to guard against hammering zookeeper.
   private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(10);
+  private long minimumRefreshDelay;
 
-  private volatile long minimumRefreshDelay;
-
-  private final ServerContext context;
+  private static final String FATE_TX_STATE_METRIC_PREFIX = "FateTxState_";
+  private static final String FATE_OP_TYPE_METRIC_PREFIX = "FateTxOpType_";
 
   private final MutableGaugeLong currentFateOps;
   private final MutableGaugeLong zkChildFateOpsTotal;
   private final MutableGaugeLong zkConnectionErrorsTotal;
 
+  private final Map<String,MutableGaugeLong> fateTypeCounts = new TreeMap<>();
+  private final Map<String,MutableGaugeLong> fateOpCounts = new TreeMap<>();
+
   private final AtomicReference<FateMetricValues> metricValues;
 
   private volatile long lastUpdate = 0;
 
+  private final IZooReaderWriter zooReaderWriter;
+  private final ReadOnlyTStore<FateMetrics> zooStore;
+  private final String fateRootPath;
+
   public FateMetrics(final ServerContext context, final long minimumRefreshDelay) {
     super("Fate", "Fate Metrics", "fate");
 
-    this.context = context;
+    zooReaderWriter = context.getZooReaderWriter();
+    fateRootPath = context.getZooKeeperRoot() + Constants.ZFATE;
+
+    try {
+
+      zooStore = new ZooStore<>(fateRootPath, zooReaderWriter);
+
+    } catch (KeeperException ex) {
+      throw new IllegalStateException(
+          "FATE Metrics - Failed to create zoo store - metrics unavailable", ex);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(
+          "FATE Metrics - Interrupt received while initializing zoo store");
+    }
 
     this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay);
 
-    metricValues = new AtomicReference<>(FateMetricValues.updateFromZookeeper(context, null));
+    metricValues = new AtomicReference<>(updateFromZookeeper());
 
     MetricsRegistry registry = super.getRegistry();
+
     currentFateOps = registry.newGauge("currentFateOps", "Current number of FATE Ops", 0L);
     zkChildFateOpsTotal = registry.newGauge("totalFateOps", "Total FATE Ops", 0L);
     zkConnectionErrorsTotal =
         registry.newGauge("totalZkConnErrors", "Total ZK Connection Errors", 0L);
 
+    for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) {
+      MutableGaugeLong g = registry.newGauge(FATE_TX_STATE_METRIC_PREFIX + t.name().toUpperCase(),
+          "Transaction count for " + t.name() + " transactions", 0L);
+      fateTypeCounts.put(t.name(), g);
+    }
+  }
+
+  /**
+   * For testing only: allow refresh delay to be set to any value, over riding the enforced minimum.
+   *
+   * @param minimumRefreshDelay
+   *          set new min refresh value, in seconds.
+   */
+  void overrideRefresh(final long minimumRefreshDelay) {
+    long delay = Math.max(0, minimumRefreshDelay);
+    this.minimumRefreshDelay = TimeUnit.SECONDS.toMillis(delay);
   }
 
   @Override
   protected void prepareMetrics() {
-    FateMetricValues fateMetrics = metricValues.get();
+
     long now = System.currentTimeMillis();
+
     if ((lastUpdate + minimumRefreshDelay) < now) {
-      fateMetrics = FateMetricValues.updateFromZookeeper(context, fateMetrics);
-      metricValues.set(fateMetrics);
+      metricValues.set(updateFromZookeeper());
+
       lastUpdate = now;
-      // update individual gauges that are reported.
-      currentFateOps.set(fateMetrics.getCurrentFateOps());
-      zkChildFateOpsTotal.set(fateMetrics.getZkFateChildOpsTotal());
-      zkConnectionErrorsTotal.set(fateMetrics.getZkConnectionErrors());
+
+      recordValues();
     }
   }
 
+  private void recordValues() {
+
+    FateMetricValues measurement = metricValues.get();
+
+    // update individual gauges that are reported.
+    currentFateOps.set(measurement.getCurrentFateOps());
+    zkChildFateOpsTotal.set(measurement.getZkFateChildOpsTotal());
+    zkConnectionErrorsTotal.set(measurement.getZkConnectionErrors());
+
+    // the number FATE Tx states (NEW< IN_PROGRESS...) are fixed - the underlying
+    // getTxStateCounters call will return a current valid count for each possible state.
+    Map<String,Long> states = measurement.getTxStateCounters();
+
+    states.forEach((key, value) -> {
+      MutableGaugeLong v = fateTypeCounts.get(key);
+      if (v != null) {
+        v.set(value);
+      } else {
+        v = super.getRegistry().newGauge(metricNameHelper(FATE_TX_STATE_METRIC_PREFIX, key),
+            "By transaction state count for " + key, value);
+        fateTypeCounts.put(key, v);
+      }
+    });
+
+    // the op types are dynamic and the metric gauges generated when first seen. After
+    // that the values need to be cleared and set any new values present. This is so
+    // that the metrics system will report "known" values once seen. In operation, the
+    // number of types will be a fairly small set and should populate with normal operations.
+
+    // clear current values.
+    fateOpCounts.forEach((key, value) -> value.set(0));
+
+    // update new counts, create new gauge if first time seen.
+    Map<String,Long> opTypes = measurement.getOpTypeCounters();
+
+    opTypes.forEach((key, value) -> {
+      MutableGaugeLong g = fateOpCounts.get(key);
+      if (g != null) {
+        g.set(value);
+      } else {
+        g = super.getRegistry().newGauge(metricNameHelper(FATE_OP_TYPE_METRIC_PREFIX, key),
+            "By transaction op type count for " + key, value);
+      }
+
+      fateOpCounts.put(key, g);
 
 Review comment:
   I'll move the `fateOpCounts.put(key.g)`  so that only created instances are added to the map.  The set in not necessary on gauge creation, it is set to the initial value in the` newGauge(...)` method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services