You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/10/12 17:11:16 UTC
[accumulo] branch 1.7 updated: [ACCUMULO-4591] Add replication
latency metrics
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.7 by this push:
new 27f1b1f [ACCUMULO-4591] Add replication latency metrics
27f1b1f is described below
commit 27f1b1f26a0cd8997d0edfe0d00be98c9a711047
Author: Adam J. Shook <ad...@gmail.com>
AuthorDate: Tue Oct 10 11:10:17 2017 -0400
[ACCUMULO-4591] Add replication latency metrics
Closes apache/accumulo#305
Signed-off-by: Josh Elser <el...@apache.org>
---
.../server/replication/ReplicationUtil.java | 30 ++++++
.../master/metrics/Metrics2ReplicationMetrics.java | 92 ++++++++++++++---
.../metrics/Metrics2ReplicationMetricsTest.java | 109 +++++++++++++++++++++
3 files changed, 217 insertions(+), 14 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index a793b6a..0871924 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
@@ -49,6 +50,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,6 +186,34 @@ public class ReplicationUtil {
return counts;
}
+ public Set<Path> getPendingReplicationPaths() {
+ final Set<Path> paths = new HashSet<>();
+
+ // Read over the queued work
+ BatchScanner bs;
+ try {
+ bs = context.getConnector().createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+ } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+ log.debug("No replication table exists", e);
+ return paths;
+ }
+
+ bs.setRanges(Collections.singleton(new Range()));
+ StatusSection.limit(bs);
+ try {
+ Text buffer = new Text();
+ for (Entry<Key,Value> entry : bs) {
+ Key k = entry.getKey();
+ k.getRow(buffer);
+ paths.add(new Path(buffer.toString()));
+ }
+ } finally {
+ bs.close();
+ }
+
+ return paths;
+ }
+
/**
* Fetches the absolute path of the file to be replicated.
*
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
index 9e35c61..2739827 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java
@@ -16,6 +16,9 @@
*/
package org.apache.accumulo.master.metrics;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -26,12 +29,17 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
@@ -39,23 +47,41 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
public static final String NAME = MASTER_NAME + ",sub=Replication", DESCRIPTION = "Data-Center Replication Metrics", CONTEXT = "master",
RECORD = "MasterReplication";
- public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads";
+ public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads",
+ REPLICATION_QUEUE_TIME_QUANTILES = "replicationQueue10m", REPLICATION_QUEUE_TIME = "replicationQueue";
+
+ private final static Logger log = LoggerFactory.getLogger(Metrics2ReplicationMetrics.class);
private final Master master;
private final MetricsSystem system;
private final MetricsRegistry registry;
private final ReplicationUtil replicationUtil;
+ private final MutableQuantiles replicationQueueTimeQuantiles;
+ private final MutableStat replicationQueueTimeStat;
+ private final Map<Path,Long> pathModTimes;
Metrics2ReplicationMetrics(Master master, MetricsSystem system) {
this.master = master;
this.system = system;
+ pathModTimes = new HashMap<>();
+
registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
replicationUtil = new ReplicationUtil(master);
+ replicationQueueTimeQuantiles = registry.newQuantiles(REPLICATION_QUEUE_TIME_QUANTILES, "Replication queue time quantiles in milliseconds", "ops",
+ "latency", 600);
+ replicationQueueTimeStat = registry.newStat(REPLICATION_QUEUE_TIME, "Replication queue time statistics in milliseconds", "ops", "latency", true);
}
protected void snapshot() {
- registry.add(PENDING_FILES, getNumFilesPendingReplication());
+ // Only add these metrics if the replication table is online and there are peers
+ if (TableState.ONLINE == Tables.getTableState(master.getInstance(), ReplicationTable.ID) && !replicationUtil.getPeers().isEmpty()) {
+ registry.add(PENDING_FILES, getNumFilesPendingReplication());
+ addReplicationQueueTimeMetrics();
+ } else {
+ registry.add(PENDING_FILES, 0);
+ }
+
registry.add(NUM_PEERS, getNumConfiguredPeers());
registry.add(MAX_REPLICATION_THREADS, getMaxReplicationThreads());
}
@@ -67,6 +93,8 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
snapshot();
registry.snapshot(builder, all);
+ replicationQueueTimeQuantiles.snapshot(builder, all);
+ replicationQueueTimeStat.snapshot(builder, all);
}
@Override
@@ -85,18 +113,6 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
}
protected int getNumFilesPendingReplication() {
- if (TableState.ONLINE != Tables.getTableState(master.getInstance(), ReplicationTable.ID)) {
- return 0;
- }
-
- // Get all of the configured replication peers
- Map<String,String> peers = replicationUtil.getPeers();
-
- // A quick lookup to see if have any replication peer configured
- if (peers.isEmpty()) {
- return 0;
- }
-
// The total set of configured targets
Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets();
@@ -124,4 +140,52 @@ public class Metrics2ReplicationMetrics implements Metrics, MetricsSource {
protected int getMaxReplicationThreads() {
return replicationUtil.getMaxReplicationThreads(master.getMasterMonitorInfo());
}
+
+ protected void addReplicationQueueTimeMetrics() {
+ Set<Path> paths = replicationUtil.getPendingReplicationPaths();
+
+ // We'll take a snap of the current time and use this as a diff between any deleted
+ // file's modification time and now. The reported latency will be off by at most a
+ // number of seconds equal to the metric polling period
+ long currentTime = getCurrentTime();
+
+ // Iterate through all the pending paths and update the mod time if we don't know it yet
+ for (Path path : paths) {
+ if (!pathModTimes.containsKey(path)) {
+ try {
+ pathModTimes.put(path, master.getFileSystem().getFileStatus(path).getModificationTime());
+ } catch (IOException e) {
+ // Ignore all IOExceptions
+ // Either the system is unavailable or the file was deleted
+ // since the initial scan and this check
+ log.trace("Failed to get file status for {}, file system is unavailable or it does not exist", path);
+ }
+ }
+ }
+
+ // Remove all currently pending files
+ Set<Path> deletedPaths = new HashSet<>(pathModTimes.keySet());
+ deletedPaths.removeAll(paths);
+
+ // Exit early if we have no replicated files to report on
+ if (deletedPaths.isEmpty()) {
+ return;
+ }
+
+ replicationQueueTimeStat.resetMinMax();
+
+ for (Path path : deletedPaths) {
+ // Remove this path and add the latency
+ Long modTime = pathModTimes.remove(path);
+ if (modTime != null) {
+ long diff = Math.max(0, currentTime - modTime);
+ replicationQueueTimeQuantiles.add(diff);
+ replicationQueueTimeStat.add(diff);
+ }
+ }
+ }
+
+ protected long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
}
diff --git a/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java b/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java
new file mode 100644
index 0000000..c8435b1
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetricsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics;
+
+import java.lang.reflect.Field;
+
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class Metrics2ReplicationMetricsTest {
+ private long currentTime = 1000L;
+
+ /**
+ * Extend the class to override the current time for testing
+ */
+ public class TestMetrics2ReplicationMetrics extends Metrics2ReplicationMetrics {
+ TestMetrics2ReplicationMetrics(Master master, MetricsSystem system) {
+ super(master, system);
+ }
+
+ @Override
+ public long getCurrentTime() {
+ return currentTime;
+ }
+ }
+
+ @Test
+ public void testAddReplicationQueueTimeMetrics() throws Exception {
+ Master master = EasyMock.createMock(Master.class);
+ MetricsSystem system = EasyMock.createMock(MetricsSystem.class);
+ VolumeManager fileSystem = EasyMock.createMock(VolumeManager.class);
+ ReplicationUtil util = EasyMock.createMock(ReplicationUtil.class);
+ MutableStat stat = EasyMock.createMock(MutableStat.class);
+ MutableQuantiles quantiles = EasyMock.createMock(MutableQuantiles.class);
+
+ Path path1 = new Path("hdfs://localhost:9000/accumulo/wal/file1");
+ Path path2 = new Path("hdfs://localhost:9000/accumulo/wal/file2");
+
+ // First call will initialize the map of paths to modification time
+ EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path1, path2));
+ EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
+ EasyMock.expect(fileSystem.getFileStatus(path1)).andReturn(createStatus(100));
+ EasyMock.expect(master.getFileSystem()).andReturn(fileSystem);
+ EasyMock.expect(fileSystem.getFileStatus(path2)).andReturn(createStatus(200));
+
+ // Second call will recognize the missing path1 and add the latency stat
+ EasyMock.expect(util.getPendingReplicationPaths()).andReturn(ImmutableSet.of(path2));
+
+ // Expect a call to reset the min/max
+ stat.resetMinMax();
+ EasyMock.expectLastCall();
+
+ // Expect the calls of adding the stats
+ quantiles.add(currentTime - 100);
+ EasyMock.expectLastCall();
+
+ stat.add(currentTime - 100);
+ EasyMock.expectLastCall();
+
+ EasyMock.replay(master, system, fileSystem, util, stat, quantiles);
+
+ Metrics2ReplicationMetrics metrics = new TestMetrics2ReplicationMetrics(master, system);
+
+ // Inject our mock objects
+ replaceField(metrics, "replicationUtil", util);
+ replaceField(metrics, "replicationQueueTimeQuantiles", quantiles);
+ replaceField(metrics, "replicationQueueTimeStat", stat);
+
+ // Two calls to this will initialize the map and then add metrics
+ metrics.addReplicationQueueTimeMetrics();
+ metrics.addReplicationQueueTimeMetrics();
+
+ EasyMock.verify(master, system, fileSystem, util, stat, quantiles);
+ }
+
+ private void replaceField(Object instance, String fieldName, Object target) throws NoSuchFieldException, IllegalAccessException {
+ Field field = instance.getClass().getSuperclass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(instance, target);
+ }
+
+ private FileStatus createStatus(long modtime) {
+ return new FileStatus(0, false, 0, 0, modtime, 0, null, null, null, null);
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].