You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/10/12 17:11:16 UTC

[accumulo] branch 1.7 updated: [ACCUMULO-4591] Add replication latency metrics

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.7 by this push:
     new 27f1b1f  [ACCUMULO-4591] Add replication latency metrics
27f1b1f is described below

commit 27f1b1f26a0cd8997d0edfe0d00be98c9a711047
Author: Adam J. Shook <ad...@gmail.com>
AuthorDate: Tue Oct 10 11:10:17 2017 -0400

    [ACCUMULO-4591] Add replication latency metrics
    
    Closes apache/accumulo#305
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../server/replication/ReplicationUtil.java        |  30 ++++++
 .../master/metrics/Metrics2ReplicationMetrics.java |  92 ++++++++++++++---
 .../metrics/Metrics2ReplicationMetricsTest.java    | 109 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 14 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index a793b6a..0871924 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
@@ -49,6 +50,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -184,6 +186,34 @@ public class ReplicationUtil {
     return counts;
   }
 
+  public Set<Path> getPendingReplicationPaths() {
+    final Set<Path> paths = new HashSet<>();
+
+    // Read over the queued work
+    BatchScanner bs;
+    try {
+      bs = context.getConnector().createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+    } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+      log.debug("No replication table exists", e);
+      return paths;
+    }
+
+    bs.setRanges(Collections.singleton(new Range()));
+    StatusSection.limit(bs);
+    try {
+      Text buffer = new Text();
+      for (Entry<Key,Value> entry : bs) {
+        Key k = entry.getKey();
+        k.getRow(buffer);
+        paths.add(new Path(buffer.toString()));
+      }
+    } finally {
+      bs.close();
+    }
+
+    return paths;
+  }
+
   /**
    * Fetches the absolute path of the file to be replicated.
    *
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
index 9e35c61..2739827 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
@@ -16,6 +16,9 @@
  */
 package org.apache.accumulo.master.metrics;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -26,12 +29,17 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.metrics.Metrics;
 import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
@@ -39,23 +47,41 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
   public static final String NAME = MASTER_NAME + ",sub=Replication", DESCRIPTION = "Data-Center Replication Metrics", CONTEXT = "master",
       RECORD = "MasterReplication";
-  public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads";
+  public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads",
+      REPLICATION_QUEUE_TIME_QUANTILES = "replicationQueue10m", REPLICATION_QUEUE_TIME = "replicationQueue";
+
+  private final static Logger log = LoggerFactory.getLogger(Metrics2ReplicationMetrics.class);
 
   private final Master master;
   private final MetricsSystem system;
   private final MetricsRegistry registry;
   private final ReplicationUtil replicationUtil;
+  private final MutableQuantiles replicationQueueTimeQuantiles;
+  private final MutableStat replicationQueueTimeStat;
+  private final Map<Path,Long> pathModTimes;
 
   Metrics2ReplicationMetrics(Master master, MetricsSystem system) {
     this.master = master;
     this.system = system;
 
+    pathModTimes = new HashMap<>();
+
     registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
     replicationUtil = new ReplicationUtil(master);
+    replicationQueueTimeQuantiles = registry.newQuantiles(REPLICATION_QUEUE_TIME_QUANTILES, "Replication queue time quantiles in milliseconds", "ops",
+        "latency", 600);
+    replicationQueueTimeStat = registry.newStat(REPLICATION_QUEUE_TIME, "Replication queue time statistics in milliseconds", "ops", "latency", true);
   }
 
   protected void snapshot() {
-    registry.add(PENDING_FILES, getNumFilesPendingReplication());
+    // Only add these metrics if the replication table is online and there are peers
+    if (TableState.ONLINE == Tables.getTableState(master.getInstance(), ReplicationTable.ID) && !replicationUtil.getPeers().isEmpty()) {
+      registry.add(PENDING_FILES, getNumFilesPendingReplication());
+      addReplicationQueueTimeMetrics();
+    } else {
+      registry.add(PENDING_FILES, 0);
+    }
+
     registry.add(NUM_PEERS, getNumConfiguredPeers());
     registry.add(MAX_REPLICATION_THREADS, getMaxReplicationThreads());
   }
@@ -67,6 +93,8 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
     snapshot();
 
     registry.snapshot(builder, all);
+    replicationQueueTimeQuantiles.snapshot(builder, all);
+    replicationQueueTimeStat.snapshot(builder, all);
   }
 
   @Override
@@ -85,18 +113,6 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
   }
 
   protected int getNumFilesPendingReplication() {
-    if (TableState.ONLINE != Tables.getTableState(master.getInstance(), ReplicationTable.ID)) {
-      return 0;
-    }
-
-    // Get all of the configured replication peers
-    Map<String,String> peers = replicationUtil.getPeers();
-
-    // A quick lookup to see if have any replication peer configured
-    if (peers.isEmpty()) {
-      return 0;
-    }
-
     // The total set of configured targets
     Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets();
 
@@ -124,4 +140,52 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
   protected int getMaxReplicationThreads() {
     return replicationUtil.getMaxReplicationThreads(master.getMasterMonitorInfo());
   }
+
+  protected void addReplicationQueueTimeMetrics() {
+    Set<Path> paths = replicationUtil.getPendingReplicationPaths();
+
+    // We'll take a snap of the current time and use this as a diff between any deleted
+    // file's modification time and now. The reported latency will be off by at most a
+    // number of seconds equal to the metric polling period
+    long currentTime = getCurrentTime();
+
+    // Iterate through all the pending paths and update the mod time if we don't know it yet
+    for (Path path : paths) {
+      if (!pathModTimes.containsKey(path)) {
+        try {
+          pathModTimes.put(path, master.getFileSystem().getFileStatus(path).getModificationTime());
+        } catch (IOException e) {
+          // Ignore all IOExceptions
+          // Either the system is unavailable or the file was deleted
+          // since the initial scan and this check
+          log.trace("Failed to get file status for {}, file system is unavailable or it does not exist", path);
+        }
+      }
+    }
+
+    // Remove all currently pending files
+    Set<Path> deletedPaths = new HashSet<>(pathModTimes.keySet());
+    deletedPaths.removeAll(paths);
+
+    // Exit early if we have no replicated files to report on
+    if (deletedPaths.isEmpty()) {
+      return;
+    }
+
+    replicationQueueTimeStat.resetMinMax();
+
+    for (Path path : deletedPaths) {
+      // Remove this path and add the latency
+      Long modTime = pathModTimes.remove(path);
+      if (modTime != null) {
+        long diff = Math.max(0, currentTime - modTime);
+        replicationQueueTimeQuantiles.add(diff);
+        replicationQueueTimeStat.add(diff);
+      }
+    }
+  }
+
+  protected long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
 }
diff --git a/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java b/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java
new file mode 100644
index 0000000..c8435b1
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics;
+
+import java.lang.reflect.Field;
+
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class Metrics2ReplicationMetricsTest {
+  private long currentTime = 1000L;
+
+  /**
+   * Extend the class to override the current time for testing
+   */
+  public class TestMetrics2ReplicationMetrics extends Metrics2ReplicationMetrics {
+    TestMetrics2ReplicationMetrics(Master master, MetricsSystem system) {
+      super(master, system);
+    }
+
+    @Override
+    public long getCurrentTime() {
+      return currentTime;
+    }
+  }
+
+  @Test
+  public void testAddReplicationQueueTimeMetrics() throws Exception {
+    Master master = EasyMock.createMock(Master.class);
+    MetricsSystem system = EasyMock.createMock(MetricsSystem.class);
+    VolumeManager fileSystem = EasyMock.createMock(VolumeManager.class);
+    ReplicationUtil util = EasyMock.createMock(ReplicationUtil.class);
+    MutableStat stat = EasyMock.createMock(MutableStat.class);
+    MutableQuantiles quantiles = EasyMock.createMock(MutableQuantiles.class);
+
+    Path path1 = new Path("hdfs://localhost:9000/accumulo/wal/file1");
+    Path path2 = new Path("hdfs://localhost:9000/accumulo/wal/file2");
+
+    // First call will initialize the map of paths to modification time
+    EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path1, path2));
+    EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
+    EasyMock.expect(fileSystem.getFileStatus(path1)).andReturn(createStatus(100));
+    EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
+    EasyMock.expect(fileSystem.getFileStatus(path2)).andReturn(createStatus(200));
+
+    // Second call will recognize the missing path1 and add the latency stat
+    EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path2));
+
+    // Expect a call to reset the min/max
+    stat.resetMinMax();
+    EasyMock.expectLastCall();
+
+    // Expect the calls of adding the stats
+    quantiles.add(currentTime - 100);
+    EasyMock.expectLastCall();
+
+    stat.add(currentTime - 100);
+    EasyMock.expectLastCall();
+
+    EasyMock.replay(master, system, fileSystem, util, stat, quantiles);
+
+    Metrics2ReplicationMetrics metrics = new TestMetrics2ReplicationMetrics(master, system);
+
+    // Inject our mock objects
+    replaceField(metrics, "replicationUtil", util);
+    replaceField(metrics, "replicationQueueTimeQuantiles", quantiles);
+    replaceField(metrics, "replicationQueueTimeStat", stat);
+
+    // Two calls to this will initialize the map and then add metrics
+    metrics.addReplicationQueueTimeMetrics();
+    metrics.addReplicationQueueTimeMetrics();
+
+    EasyMock.verify(master, system, fileSystem, util, stat, quantiles);
+  }
+
+  private void replaceField(Object instance, String fieldName, Object target) throws NoSuchFieldException, IllegalAccessException {
+    Field field = instance.getClass().getSuperclass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(instance, target);
+  }
+
+  private FileStatus createStatus(long modtime) {
+    return new FileStatus(0, false, 0, 0, modtime, 0, null, null, null, null);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].