You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/12/15 22:15:04 UTC

[accumulo] branch main updated: Improvements to monitor external compactions page (#2385)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 02de510  Improvements to monitor external compactions page (#2385)
02de510 is described below

commit 02de5104c4c115d913e5547e1dd6f0b6d26390d1
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Dec 15 17:14:50 2021 -0500

    Improvements to monitor external compactions page (#2385)
    
    * Add endpoint to monitor for external compactions details and modify
    page to make ajax calls when getting the details of a running compaction
    * Cache up to 50 of the details JSON objects in session storage
    * Use ConcurrentHashMap for storing running compactions in Monitor
---
 .../java/org/apache/accumulo/monitor/Monitor.java  | 14 ++--
 .../rest/compactions/external/CompactorInfo.java   |  8 +-
 .../rest/compactions/external/ECResource.java      | 28 ++++++-
 .../external/RunningCompactorDetails.java          | 51 ++++++++++++
 .../compactions/external/RunningCompactorInfo.java | 28 +++----
 .../org/apache/accumulo/monitor/resources/js/ec.js | 91 ++++++++++++++++++----
 .../compaction/ExternalCompactionProgressIT.java   |  2 +-
 7 files changed, 178 insertions(+), 44 deletions(-)

diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index dcd7b37..25e6fbd 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -37,6 +37,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -587,7 +588,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
   private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
   private final RecentLogs recentLogs = new RecentLogs();
   private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
-  private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>();
+  private final Map<String,TExternalCompaction> ecRunningMap = new ConcurrentHashMap<>();
   private long scansFetchedNanos = 0L;
   private long compactsFetchedNanos = 0L;
   private long ecInfoFetchedNanos = 0L;
@@ -639,7 +640,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
    * user fetches since RPC calls are going to the coordinator. This allows for fine grain updates
    * of external compaction progress.
    */
-  public synchronized Map<String,TExternalCompaction> getRunningInfo() {
+  public synchronized Map<String,TExternalCompaction> fetchRunningInfo() {
     if (coordinatorHost.isEmpty()) {
       throw new IllegalStateException(coordinatorMissingMsg);
     }
@@ -655,15 +656,16 @@ public class Monitor extends AbstractServer implements HighlyAvailableService {
 
     ecRunningMap.clear();
     if (running.getCompactions() != null) {
-      running.getCompactions().forEach((queue, ec) -> {
-        log.trace("Found Compactions running on queue {} -> {}", queue, ec);
-        ecRunningMap.put(queue, ec);
-      });
+      ecRunningMap.putAll(running.getCompactions());
     }
 
     return ecRunningMap;
   }
 
+  public Map<String,TExternalCompaction> getEcRunningMap() {
+    return ecRunningMap;
+  }
+
   private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) {
     if (coordinatorClient == null) {
       try {
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
index 1363ece..a7dc56a 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -21,9 +21,11 @@ package org.apache.accumulo.monitor.rest.compactions.external;
 public class CompactorInfo {
 
   // Variable names become JSON keys
-  public long lastContact;
-  public String server;
-  public String queueName;
+  public long lastContact = 0L;
+  public String server = "";
+  public String queueName = "";
+
+  public CompactorInfo() {}
 
   public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) {
     lastContact = System.currentTimeMillis() - fetchedTimeMillis;
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 6c1a050..d7035d6 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -19,9 +19,11 @@
 package org.apache.accumulo.monitor.rest.compactions.external;
 
 import jakarta.inject.Inject;
+import jakarta.validation.constraints.NotNull;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.Path;
 import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
 
 import org.apache.accumulo.monitor.Monitor;
@@ -36,7 +38,7 @@ import org.slf4j.LoggerFactory;
 @Path("/ec")
 @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
 public class ECResource {
-  private static Logger log = LoggerFactory.getLogger(ECResource.class);
+  private static final Logger log = LoggerFactory.getLogger(ECResource.class);
 
   @Inject
   private Monitor monitor;
@@ -57,6 +59,28 @@ public class ECResource {
   @Path("running")
   @GET
   public RunningCompactions getRunning() {
-    return new RunningCompactions(monitor.getRunningInfo());
+    return new RunningCompactions(monitor.fetchRunningInfo());
+  }
+
+  @Path("details")
+  @GET
+  public RunningCompactorDetails getDetails(@QueryParam("ecid") @NotNull String ecid) {
+    // make parameter more user-friendly by ensuring the ecid prefix is present
+    ecid = ecid.replace("ecid:", "ECID:");
+    if (!ecid.startsWith("ECID:"))
+      ecid = "ECID:" + ecid;
+
+    var ecMap = monitor.getEcRunningMap();
+    var externalCompaction = ecMap.get(ecid);
+    if (externalCompaction == null) {
+      // map could be old so fetch all running compactions and try again
+      ecMap = monitor.fetchRunningInfo();
+      externalCompaction = ecMap.get(ecid);
+      if (externalCompaction == null) {
+        log.warn("Failed to find details for ECID: {}", ecid);
+        return new RunningCompactorDetails();
+      }
+    }
+    return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction);
   }
 }
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
new file mode 100644
index 0000000..c6c3af2
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+
+public class RunningCompactorDetails extends RunningCompactorInfo {
+  // Variable names become JSON keys
+  public List<CompactionInputFile> inputFiles = new ArrayList<>();
+  public String outputFile;
+
+  public RunningCompactorDetails() {
+    super();
+  }
+
+  public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) {
+    super(fetchedTime, ecid, ec);
+    var job = ec.getJob();
+    inputFiles = convertInputFiles(job.files);
+    outputFile = job.outputFile;
+  }
+
+  private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
+    List<CompactionInputFile> list = new ArrayList<>();
+    files.forEach(f -> list
+        .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp)));
+    // sorted largest to smallest
+    list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
+    return list;
+  }
+}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
index 711c135..7ef070f 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
@@ -18,57 +18,47 @@
  */
 package org.apache.accumulo.monitor.rest.compactions.external;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
+import jakarta.validation.constraints.NotNull;
+
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RunningCompactorInfo extends CompactorInfo {
-  private static Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
+  private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
 
   // Variable names become JSON keys
   public String ecid;
   public String kind;
   public String tableId;
-  public List<CompactionInputFile> inputFiles;
   public int numFiles;
-  public String outputFile;
   public float progress = 0f;
   public long duration;
   public String status;
   public long lastUpdate;
 
-  public RunningCompactorInfo(long fetchedTime, String ecid, TExternalCompaction ec) {
+  public RunningCompactorInfo() {
+    super();
+  }
+
+  public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) {
     super(fetchedTime, ec.getQueueName(), ec.getCompactor());
     this.ecid = ecid;
     var updates = ec.getUpdates();
     var job = ec.getJob();
     kind = job.getKind().name();
     tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
-    inputFiles = convertInputFiles(job.files);
-    numFiles = inputFiles.size();
-    outputFile = job.outputFile;
+    numFiles = job.files.size();
     updateProgress(updates);
     log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
   }
 
-  private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
-    List<CompactionInputFile> list = new ArrayList<>();
-    files.forEach(f -> list
-        .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp)));
-    // sorted largest to smallest
-    list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
-    return list;
-  }
-
   /**
    * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update.
    * Also update the status.
diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
index 201edbd..dac238d 100644
--- a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
+++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
@@ -27,6 +27,9 @@
   * Creates active compactions table
   */
  $(document).ready(function() {
+    if (sessionStorage.ecDetailsJSON === undefined) {
+       sessionStorage.ecDetailsJSON = JSON.stringify([]);
+    }
     compactorsTable = $('#compactorsTable').DataTable({
            "ajax": {
                "url": '/rest/ec/compactors',
@@ -124,23 +127,26 @@
 
              // Remove from the 'open' array
              detailRows.splice( idx, 1 );
-         }
-         else {
+         } else {
              var rci = row.data();
+             var ecid = rci.ecid;
+             var idSuffix = ecid.substring(ecid.length-5, ecid.length);
              tr.addClass( 'details' );
              // put all the information into html for a single row
-             var htmlRow = "<table class='table table-bordered table-striped table-condensed'>"
+             var htmlRow = "<table class='table table-bordered table-striped table-condensed' id='table"+idSuffix+"'>"
              htmlRow += "<thead><tr><th>#</th><th>Input Files</th><th>Size</th><th>Entries</th></tr></thead>";
-             $.each( rci.inputFiles, function( key, value ) {
-               htmlRow += "<tr><td>" + key + "</td>";
-               htmlRow += "<td>" + value.metadataFileEntry + "</td>";
-               htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>";
-               htmlRow += "<td>" + bigNumberForQuantity(value.entries) + "</td></tr>";
-             });
-             htmlRow += "</table>";
-             htmlRow += "Output File: " + rci.outputFile + "<br>";
-             htmlRow += rci.ecid;
+             htmlRow += "<tbody></tbody></table>";
+             htmlRow += "Output File: <span id='outputFile" + idSuffix + "'></span><br>";
+             htmlRow += ecid;
              row.child(htmlRow).show();
+             // show the row then populate the table
+             var ecDetails = getDetailsFromStorage(idSuffix);
+             if (ecDetails.length === 0) {
+                getRunningDetails(ecid, idSuffix);
+             } else {
+                console.log("Got cached details for " + idSuffix);
+                populateDetails(ecDetails, idSuffix);
+             }
 
              // Add to the 'open' array
              if ( idx === -1 ) {
@@ -163,7 +169,11 @@
   */
  function refreshECTables() {
    getCompactionCoordinator();
-   var ecInfo = JSON.parse(sessionStorage.ecInfo);
+   var ecInfo = sessionStorage.ecInfo === undefined ? [] :
+      JSON.parse(sessionStorage.ecInfo);
+   if (ecInfo.length === 0) {
+      return;
+   }
    var ccAddress = ecInfo.server;
    var numCompactors = ecInfo.numCompactors;
    var lastContactTime = timeDuration(ecInfo.lastContact);
@@ -188,6 +198,61 @@
    });
  }
 
+ function getRunningDetails(ecid, idSuffix) {
+    var ajaxUrl = '/rest/ec/details?ecid=' + ecid;
+    console.log("Ajax call to " + ajaxUrl);
+    $.getJSON(ajaxUrl, function(data) {
+       populateDetails(data, idSuffix);
+       var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON);
+       if (detailsJSON === undefined) {
+          detailsJSON = [];
+       } else if (detailsJSON.length >= 50) {
+          // drop the oldest 25 from the sessionStorage to limit size of the cache
+          var newDetailsJSON = [];
+          $.each( detailsJSON, function( num, val ) {
+             if (num > 24) {
+                newDetailsJSON.push(val);
+             }
+          });
+          detailsJSON = newDetailsJSON;
+       }
+       detailsJSON.push({ key : idSuffix, value : data });
+       sessionStorage.ecDetailsJSON = JSON.stringify(detailsJSON);
+    });
+  }
+
+ function getDetailsFromStorage(idSuffix) {
+    var details = [];
+    var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON);
+    if (detailsJSON.length === 0) {
+       return details;
+    } else {
+       // details are stored as key value pairs in the JSON val
+       $.each( detailsJSON, function( num, val ) {
+          if (val.key === idSuffix) {
+             details = val.value;
+          }
+       });
+       return details;
+    }
+ }
+
+ function populateDetails(data, idSuffix) {
+     var tableId = 'table' + idSuffix;
+     clearTableBody(tableId);
+    $.each( data.inputFiles, function( key, value ) {
+       var items = [];
+       items.push(createCenterCell(key, key));
+       items.push(createCenterCell(value.metadataFileEntry, value.metadataFileEntry));
+       items.push(createCenterCell(value.size, bigNumberForSize(value.size)));
+       items.push(createCenterCell(value.entries, bigNumberForQuantity(value.entries)));
+       $('<tr/>', {
+           html: items.join('')
+         }).appendTo('#' + tableId + ' tbody');
+    });
+    $('#outputFile' + idSuffix).text(data.outputFile);
+ }
+
  function refreshCompactors() {
    console.log("Refresh compactors table.");
    // user paging is not reset on reload
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 305344c..20f974c 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -131,7 +131,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
         RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
         RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
         if (previousRci == null) {
-          log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles);
+          log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
         } else {
           if (rci.progress <= previousRci.progress) {
             log.warn("{} did not progress. It went from {} to {}", ecid, previousRci.progress,