You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/01/03 13:57:21 UTC

[accumulo] branch main updated: Create RunningCompactionInfo in core for reuse (#2403)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 681139e  Create RunningCompactionInfo in core for reuse (#2403)
681139e is described below

commit 681139e786787bdc5c9778bbb2a6062962e63f51
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Jan 3 08:57:15 2022 -0500

    Create RunningCompactionInfo in core for reuse (#2403)
    
    * Create new object in core for reuse
    * Make private vars of CompactorInfo final
---
 .../core/util/compaction/RunningCompaction.java    |  6 +-
 .../util/compaction/RunningCompactionInfo.java     | 93 +++++++++++-----------
 .../rest/compactions/external/CompactorInfo.java   |  8 +-
 .../rest/compactions/external/ECResource.java      |  5 +-
 .../compactions/external/RunningCompactions.java   |  6 +-
 .../external/RunningCompactorDetails.java          | 15 ++--
 .../compaction/ExternalCompactionProgressIT.java   |  8 +-
 7 files changed, 69 insertions(+), 72 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
index 5ed976d..d057da1 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 
 public class RunningCompaction {
@@ -32,12 +33,15 @@ public class RunningCompaction {
   private final Map<Long,TCompactionStatusUpdate> updates = new TreeMap<>();
 
   public RunningCompaction(TExternalCompactionJob job, String compactorAddress, String queueName) {
-    super();
     this.job = job;
     this.compactorAddress = compactorAddress;
     this.queueName = queueName;
   }
 
+  public RunningCompaction(TExternalCompaction tEC) {
+    this(tEC.getJob(), tEC.getCompactor(), tEC.getQueueName());
+  }
+
   public Map<Long,TCompactionStatusUpdate> getUpdates() {
     return updates;
   }
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
similarity index 67%
rename from server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
rename to core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
index 7ef070f..4f67417 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
@@ -16,60 +16,54 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.monitor.rest.compactions.external;
+package org.apache.accumulo.core.util.compaction;
+
+import static java.util.Objects.requireNonNull;
 
-import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
-import jakarta.validation.constraints.NotNull;
-
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RunningCompactorInfo extends CompactorInfo {
-  private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
+public class RunningCompactionInfo {
+  private static final Logger log = LoggerFactory.getLogger(RunningCompactionInfo.class);
 
-  // Variable names become JSON keys
-  public String ecid;
-  public String kind;
-  public String tableId;
-  public int numFiles;
-  public float progress = 0f;
-  public long duration;
-  public String status;
-  public long lastUpdate;
+  // DO NOT CHANGE Variable names - they map to JSON keys in the Monitor
+  public final String server;
+  public final String queueName;
+  public final String ecid;
+  public final String kind;
+  public final String tableId;
+  public final int numFiles;
+  public final float progress;
+  public final long duration;
+  public final String status;
+  public final long lastUpdate;
 
-  public RunningCompactorInfo() {
-    super();
-  }
+  /**
+   * Info parsed about the external running compaction. Calculate the progress, which is defined as
+   * the percentage of bytesRead / bytesToBeCompacted of the last update.
+   */
+  public RunningCompactionInfo(TExternalCompaction ec) {
+    requireNonNull(ec, "Thrift external compaction is null.");
+    var updates = requireNonNull(ec.getUpdates(), "Missing Thrift external compaction updates");
+    var job = requireNonNull(ec.getJob(), "Thrift external compaction job is null");
 
-  public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) {
-    super(fetchedTime, ec.getQueueName(), ec.getCompactor());
-    this.ecid = ecid;
-    var updates = ec.getUpdates();
-    var job = ec.getJob();
+    server = ec.getCompactor();
+    queueName = ec.getQueueName();
+    ecid = job.getExternalCompactionId();
     kind = job.getKind().name();
-    tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
-    numFiles = job.files.size();
-    updateProgress(updates);
-    log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
-  }
+    tableId = KeyExtent.fromThrift(job.getExtent()).tableId().canonical();
+    numFiles = job.getFiles().size();
 
-  /**
-   * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update.
-   * Also update the status.
-   */
-  private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) {
-    if (updates.isEmpty()) {
-      progress = 0f;
-      status = "na";
-    }
+    // parse the updates map
     long nowMillis = System.currentTimeMillis();
     long startedMillis = nowMillis;
+    float percent = 0f;
     long updateMillis;
     TCompactionStatusUpdate last;
 
@@ -92,28 +86,33 @@ public class RunningCompactorInfo extends CompactorInfo {
       updateMillis = lastEntry.getKey();
     } else {
       log.debug("No updates found for {}", ecid);
+      lastUpdate = nowMillis;
+      progress = percent;
+      status = "na";
       return;
     }
 
     long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis - updateMillis);
     log.debug("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis,
         sinceLastUpdateSeconds);
-    if (sinceLastUpdateSeconds > 30) {
-      log.debug("Compaction hasn't progressed from {} in {} seconds.", progress,
-          sinceLastUpdateSeconds);
-    }
 
-    float percent;
     var total = last.getEntriesToBeCompacted();
-    if (total <= 0) {
-      percent = 0f;
-    } else {
+    if (total > 0) {
       percent = (last.getEntriesRead() / (float) total) * 100;
     }
-
     lastUpdate = nowMillis - updateMillis;
-    status = last.state.name();
     progress = percent;
+
+    if (updates.isEmpty()) {
+      status = "na";
+    } else {
+      status = last.state.name();
+    }
+    log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
+    if (sinceLastUpdateSeconds > 30) {
+      log.debug("Compaction hasn't progressed from {} in {} seconds.", progress,
+          sinceLastUpdateSeconds);
+    }
   }
 
   @Override
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
index a7dc56a..5e7c36f 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -21,11 +21,9 @@ package org.apache.accumulo.monitor.rest.compactions.external;
 public class CompactorInfo {
 
   // Variable names become JSON keys
-  public long lastContact = 0L;
-  public String server = "";
-  public String queueName = "";
-
-  public CompactorInfo() {}
+  public final long lastContact;
+  public final String server;
+  public final String queueName;
 
   public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) {
     lastContact = System.currentTimeMillis() - fetchedTimeMillis;
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 131abc8..b17a702 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -77,10 +77,9 @@ public class ECResource {
       ecMap = monitor.fetchRunningInfo();
       externalCompaction = ecMap.get(ecid);
       if (externalCompaction == null) {
-        log.warn("Failed to find details for ECID: {}", ecid);
-        return new RunningCompactorDetails();
+        throw new IllegalStateException("Failed to find details for ECID: " + ecid);
       }
     }
-    return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction);
+    return new RunningCompactorDetails(externalCompaction);
   }
 }
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
index 294b91c..1f813f1 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
@@ -23,16 +23,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 
 public class RunningCompactions {
 
-  public final List<RunningCompactorInfo> running = new ArrayList<>();
+  public final List<RunningCompactionInfo> running = new ArrayList<>();
 
   public RunningCompactions(Map<String,TExternalCompaction> rMap) {
     if (rMap != null) {
-      var fetchedTime = System.currentTimeMillis();
       for (var entry : rMap.entrySet()) {
-        running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(), entry.getValue()));
+        running.add(new RunningCompactionInfo(entry.getValue()));
       }
     }
   }
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
index c6c3af2..ad6cbff 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
@@ -23,18 +23,15 @@ import java.util.List;
 
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 
-public class RunningCompactorDetails extends RunningCompactorInfo {
+public class RunningCompactorDetails extends RunningCompactionInfo {
   // Variable names become JSON keys
-  public List<CompactionInputFile> inputFiles = new ArrayList<>();
-  public String outputFile;
+  public final List<CompactionInputFile> inputFiles;
+  public final String outputFile;
 
-  public RunningCompactorDetails() {
-    super();
-  }
-
-  public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) {
-    super(fetchedTime, ecid, ec);
+  public RunningCompactorDetails(TExternalCompaction ec) {
+    super(ec);
     var job = ec.getJob();
     inputFiles = convertInputFiles(job.files);
     outputFile = job.outputFile;
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 20f974c..27004d3 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,11 +36,11 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TException;
@@ -63,7 +63,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
     STARTED, QUARTER, HALF, THREE_QUARTERS
   }
 
-  Map<String,RunningCompactorInfo> runningMap = new HashMap<>();
+  Map<String,RunningCompactionInfo> runningMap = new HashMap<>();
   List<EC_PROGRESS> progressList = new ArrayList<>();
 
   private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
@@ -128,8 +128,8 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
     if (ecMap != null) {
       ecMap.forEach((ecid, ec) -> {
         // returns null if it's a new mapping
-        RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
-        RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
+        RunningCompactionInfo rci = new RunningCompactionInfo(ec);
+        RunningCompactionInfo previousRci = runningMap.put(ecid, rci);
         if (previousRci == null) {
           log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
         } else {