You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/12/15 22:15:04 UTC
[accumulo] branch main updated: Improvements to monitor external compactions page (#2385)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 02de510 Improvements to monitor external compactions page (#2385)
02de510 is described below
commit 02de5104c4c115d913e5547e1dd6f0b6d26390d1
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Dec 15 17:14:50 2021 -0500
Improvements to monitor external compactions page (#2385)
* Add endpoint to monitor for external compactions details and modify
page to make ajax calls when getting the details of a running compaction
* Cache up to 50 of the details JSON objects in session storage
* Use ConcurrentHashMap for storing running compactions in Monitor
---
.../java/org/apache/accumulo/monitor/Monitor.java | 14 ++--
.../rest/compactions/external/CompactorInfo.java | 8 +-
.../rest/compactions/external/ECResource.java | 28 ++++++-
.../external/RunningCompactorDetails.java | 51 ++++++++++++
.../compactions/external/RunningCompactorInfo.java | 28 +++----
.../org/apache/accumulo/monitor/resources/js/ec.js | 91 ++++++++++++++++++----
.../compaction/ExternalCompactionProgressIT.java | 2 +-
7 files changed, 178 insertions(+), 44 deletions(-)
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index dcd7b37..25e6fbd 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -37,6 +37,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -587,7 +588,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
private final RecentLogs recentLogs = new RecentLogs();
private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
- private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>();
+ private final Map<String,TExternalCompaction> ecRunningMap = new ConcurrentHashMap<>();
private long scansFetchedNanos = 0L;
private long compactsFetchedNanos = 0L;
private long ecInfoFetchedNanos = 0L;
@@ -639,7 +640,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
* user fetches since RPC calls are going to the coordinator. This allows for fine grain updates
* of external compaction progress.
*/
- public synchronized Map<String,TExternalCompaction> getRunningInfo() {
+ public synchronized Map<String,TExternalCompaction> fetchRunningInfo() {
if (coordinatorHost.isEmpty()) {
throw new IllegalStateException(coordinatorMissingMsg);
}
@@ -655,15 +656,16 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
ecRunningMap.clear();
if (running.getCompactions() != null) {
- running.getCompactions().forEach((queue, ec) -> {
- log.trace("Found Compactions running on queue {} -> {}", queue, ec);
- ecRunningMap.put(queue, ec);
- });
+ ecRunningMap.putAll(running.getCompactions());
}
return ecRunningMap;
}
+ public Map<String,TExternalCompaction> getEcRunningMap() {
+ return ecRunningMap;
+ }
+
private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) {
if (coordinatorClient == null) {
try {
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
index 1363ece..a7dc56a 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -21,9 +21,11 @@ package org.apache.accumulo.monitor.rest.compactions.external;
public class CompactorInfo {
// Variable names become JSON keys
- public long lastContact;
- public String server;
- public String queueName;
+ public long lastContact = 0L;
+ public String server = "";
+ public String queueName = "";
+
+ public CompactorInfo() {}
public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) {
lastContact = System.currentTimeMillis() - fetchedTimeMillis;
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 6c1a050..d7035d6 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -19,9 +19,11 @@
package org.apache.accumulo.monitor.rest.compactions.external;
import jakarta.inject.Inject;
+import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import org.apache.accumulo.monitor.Monitor;
@@ -36,7 +38,7 @@ import org.slf4j.LoggerFactory;
@Path("/ec")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public class ECResource {
- private static Logger log = LoggerFactory.getLogger(ECResource.class);
+ private static final Logger log = LoggerFactory.getLogger(ECResource.class);
@Inject
private Monitor monitor;
@@ -57,6 +59,28 @@ public class ECResource {
@Path("running")
@GET
public RunningCompactions getRunning() {
- return new RunningCompactions(monitor.getRunningInfo());
+ return new RunningCompactions(monitor.fetchRunningInfo());
+ }
+
+ @Path("details")
+ @GET
+ public RunningCompactorDetails getDetails(@QueryParam("ecid") @NotNull String ecid) {
+ // make parameter more user-friendly by ensuring the ecid prefix is present
+ ecid = ecid.replace("ecid:", "ECID:");
+ if (!ecid.startsWith("ECID:"))
+ ecid = "ECID:" + ecid;
+
+ var ecMap = monitor.getEcRunningMap();
+ var externalCompaction = ecMap.get(ecid);
+ if (externalCompaction == null) {
+ // map could be old so fetch all running compactions and try again
+ ecMap = monitor.fetchRunningInfo();
+ externalCompaction = ecMap.get(ecid);
+ if (externalCompaction == null) {
+ log.warn("Failed to find details for ECID: {}", ecid);
+ return new RunningCompactorDetails();
+ }
+ }
+ return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction);
}
}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
new file mode 100644
index 0000000..c6c3af2
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+
+public class RunningCompactorDetails extends RunningCompactorInfo {
+ // Variable names become JSON keys
+ public List<CompactionInputFile> inputFiles = new ArrayList<>();
+ public String outputFile;
+
+ public RunningCompactorDetails() {
+ super();
+ }
+
+ public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) {
+ super(fetchedTime, ecid, ec);
+ var job = ec.getJob();
+ inputFiles = convertInputFiles(job.files);
+ outputFile = job.outputFile;
+ }
+
+ private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
+ List<CompactionInputFile> list = new ArrayList<>();
+ files.forEach(f -> list
+ .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp)));
+ // sorted largest to smallest
+ list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
+ return list;
+ }
+}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
index 711c135..7ef070f 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
@@ -18,57 +18,47 @@
*/
package org.apache.accumulo.monitor.rest.compactions.external;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import jakarta.validation.constraints.NotNull;
+
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RunningCompactorInfo extends CompactorInfo {
- private static Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
+ private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
// Variable names become JSON keys
public String ecid;
public String kind;
public String tableId;
- public List<CompactionInputFile> inputFiles;
public int numFiles;
- public String outputFile;
public float progress = 0f;
public long duration;
public String status;
public long lastUpdate;
- public RunningCompactorInfo(long fetchedTime, String ecid, TExternalCompaction ec) {
+ public RunningCompactorInfo() {
+ super();
+ }
+
+ public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) {
super(fetchedTime, ec.getQueueName(), ec.getCompactor());
this.ecid = ecid;
var updates = ec.getUpdates();
var job = ec.getJob();
kind = job.getKind().name();
tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
- inputFiles = convertInputFiles(job.files);
- numFiles = inputFiles.size();
- outputFile = job.outputFile;
+ numFiles = job.files.size();
updateProgress(updates);
log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
}
- private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
- List<CompactionInputFile> list = new ArrayList<>();
- files.forEach(f -> list
- .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp)));
- // sorted largest to smallest
- list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
- return list;
- }
-
/**
* Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update.
* Also update the status.
diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
index 201edbd..dac238d 100644
--- a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
+++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
@@ -27,6 +27,9 @@
* Creates active compactions table
*/
$(document).ready(function() {
+ if (sessionStorage.ecDetailsJSON === undefined) {
+ sessionStorage.ecDetailsJSON = JSON.stringify([]);
+ }
compactorsTable = $('#compactorsTable').DataTable({
"ajax": {
"url": '/rest/ec/compactors',
@@ -124,23 +127,26 @@
// Remove from the 'open' array
detailRows.splice( idx, 1 );
- }
- else {
+ } else {
var rci = row.data();
+ var ecid = rci.ecid;
+ var idSuffix = ecid.substring(ecid.length-5, ecid.length);
tr.addClass( 'details' );
// put all the information into html for a single row
- var htmlRow = "<table class='table table-bordered table-striped table-condensed'>"
+ var htmlRow = "<table class='table table-bordered table-striped table-condensed' id='table"+idSuffix+"'>"
htmlRow += "<thead><tr><th>#</th><th>Input Files</th><th>Size</th><th>Entries</th></tr></thead>";
- $.each( rci.inputFiles, function( key, value ) {
- htmlRow += "<tr><td>" + key + "</td>";
- htmlRow += "<td>" + value.metadataFileEntry + "</td>";
- htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>";
- htmlRow += "<td>" + bigNumberForQuantity(value.entries) + "</td></tr>";
- });
- htmlRow += "</table>";
- htmlRow += "Output File: " + rci.outputFile + "<br>";
- htmlRow += rci.ecid;
+ htmlRow += "<tbody></tbody></table>";
+ htmlRow += "Output File: <span id='outputFile" + idSuffix + "'></span><br>";
+ htmlRow += ecid;
row.child(htmlRow).show();
+ // show the row then populate the table
+ var ecDetails = getDetailsFromStorage(idSuffix);
+ if (ecDetails.length === 0) {
+ getRunningDetails(ecid, idSuffix);
+ } else {
+ console.log("Got cached details for " + idSuffix);
+ populateDetails(ecDetails, idSuffix);
+ }
// Add to the 'open' array
if ( idx === -1 ) {
@@ -163,7 +169,11 @@
*/
function refreshECTables() {
getCompactionCoordinator();
- var ecInfo = JSON.parse(sessionStorage.ecInfo);
+ var ecInfo = sessionStorage.ecInfo === undefined ? [] :
+ JSON.parse(sessionStorage.ecInfo);
+ if (ecInfo.length === 0) {
+ return;
+ }
var ccAddress = ecInfo.server;
var numCompactors = ecInfo.numCompactors;
var lastContactTime = timeDuration(ecInfo.lastContact);
@@ -188,6 +198,61 @@
});
}
+ function getRunningDetails(ecid, idSuffix) {
+ var ajaxUrl = '/rest/ec/details?ecid=' + ecid;
+ console.log("Ajax call to " + ajaxUrl);
+ $.getJSON(ajaxUrl, function(data) {
+ populateDetails(data, idSuffix);
+ var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON);
+ if (detailsJSON === undefined) {
+ detailsJSON = [];
+ } else if (detailsJSON.length >= 50) {
+ // drop the oldest 25 from the sessionStorage to limit size of the cache
+ var newDetailsJSON = [];
+ $.each( detailsJSON, function( num, val ) {
+ if (num > 24) {
+ newDetailsJSON.push(val);
+ }
+ });
+ detailsJSON = newDetailsJSON;
+ }
+ detailsJSON.push({ key : idSuffix, value : data });
+ sessionStorage.ecDetailsJSON = JSON.stringify(detailsJSON);
+ });
+ }
+
+ function getDetailsFromStorage(idSuffix) {
+ var details = [];
+ var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON);
+ if (detailsJSON.length === 0) {
+ return details;
+ } else {
+ // details are stored as key value pairs in the JSON val
+ $.each( detailsJSON, function( num, val ) {
+ if (val.key === idSuffix) {
+ details = val.value;
+ }
+ });
+ return details;
+ }
+ }
+
+ function populateDetails(data, idSuffix) {
+ var tableId = 'table' + idSuffix;
+ clearTableBody(tableId);
+ $.each( data.inputFiles, function( key, value ) {
+ var items = [];
+ items.push(createCenterCell(key, key));
+ items.push(createCenterCell(value.metadataFileEntry, value.metadataFileEntry));
+ items.push(createCenterCell(value.size, bigNumberForSize(value.size)));
+ items.push(createCenterCell(value.entries, bigNumberForQuantity(value.entries)));
+ $('<tr/>', {
+ html: items.join('')
+ }).appendTo('#' + tableId + ' tbody');
+ });
+ $('#outputFile' + idSuffix).text(data.outputFile);
+ }
+
function refreshCompactors() {
console.log("Refresh compactors table.");
// user paging is not reset on reload
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 305344c..20f974c 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -131,7 +131,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
if (previousRci == null) {
- log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles);
+ log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
} else {
if (rci.progress <= previousRci.progress) {
log.warn("{} did not progress. It went from {} to {}", ecid, previousRci.progress,