You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/01/03 13:57:21 UTC
[accumulo] branch main updated: Create RunningCompactionInfo in core for reuse (#2403)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 681139e Create RunningCompactionInfo in core for reuse (#2403)
681139e is described below
commit 681139e786787bdc5c9778bbb2a6062962e63f51
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Jan 3 08:57:15 2022 -0500
Create RunningCompactionInfo in core for reuse (#2403)
* Create new object in core for reuse
* Make private vars of CompactorInfo final
---
.../core/util/compaction/RunningCompaction.java | 6 +-
.../util/compaction/RunningCompactionInfo.java | 93 +++++++++++-----------
.../rest/compactions/external/CompactorInfo.java | 8 +-
.../rest/compactions/external/ECResource.java | 5 +-
.../compactions/external/RunningCompactions.java | 6 +-
.../external/RunningCompactorDetails.java | 15 ++--
.../compaction/ExternalCompactionProgressIT.java | 8 +-
7 files changed, 69 insertions(+), 72 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
index 5ed976d..d057da1 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
public class RunningCompaction {
@@ -32,12 +33,15 @@ public class RunningCompaction {
private final Map<Long,TCompactionStatusUpdate> updates = new TreeMap<>();
public RunningCompaction(TExternalCompactionJob job, String compactorAddress, String queueName) {
- super();
this.job = job;
this.compactorAddress = compactorAddress;
this.queueName = queueName;
}
+ public RunningCompaction(TExternalCompaction tEC) {
+ this(tEC.getJob(), tEC.getCompactor(), tEC.getQueueName());
+ }
+
public Map<Long,TCompactionStatusUpdate> getUpdates() {
return updates;
}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
similarity index 67%
rename from server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
rename to core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
index 7ef070f..4f67417 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java
@@ -16,60 +16,54 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.monitor.rest.compactions.external;
+package org.apache.accumulo.core.util.compaction;
+
+import static java.util.Objects.requireNonNull;
-import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import jakarta.validation.constraints.NotNull;
-
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RunningCompactorInfo extends CompactorInfo {
- private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class);
+public class RunningCompactionInfo {
+ private static final Logger log = LoggerFactory.getLogger(RunningCompactionInfo.class);
- // Variable names become JSON keys
- public String ecid;
- public String kind;
- public String tableId;
- public int numFiles;
- public float progress = 0f;
- public long duration;
- public String status;
- public long lastUpdate;
+ // DO NOT CHANGE Variable names - they map to JSON keys in the Monitor
+ public final String server;
+ public final String queueName;
+ public final String ecid;
+ public final String kind;
+ public final String tableId;
+ public final int numFiles;
+ public final float progress;
+ public final long duration;
+ public final String status;
+ public final long lastUpdate;
- public RunningCompactorInfo() {
- super();
- }
+ /**
+ * Info parsed about the external running compaction. Calculate the progress, which is defined as
+ * the percentage of bytesRead / bytesToBeCompacted of the last update.
+ */
+ public RunningCompactionInfo(TExternalCompaction ec) {
+ requireNonNull(ec, "Thrift external compaction is null.");
+ var updates = requireNonNull(ec.getUpdates(), "Missing Thrift external compaction updates");
+ var job = requireNonNull(ec.getJob(), "Thrift external compaction job is null");
- public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) {
- super(fetchedTime, ec.getQueueName(), ec.getCompactor());
- this.ecid = ecid;
- var updates = ec.getUpdates();
- var job = ec.getJob();
+ server = ec.getCompactor();
+ queueName = ec.getQueueName();
+ ecid = job.getExternalCompactionId();
kind = job.getKind().name();
- tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
- numFiles = job.files.size();
- updateProgress(updates);
- log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
- }
+ tableId = KeyExtent.fromThrift(job.getExtent()).tableId().canonical();
+ numFiles = job.getFiles().size();
- /**
- * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update.
- * Also update the status.
- */
- private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) {
- if (updates.isEmpty()) {
- progress = 0f;
- status = "na";
- }
+ // parse the updates map
long nowMillis = System.currentTimeMillis();
long startedMillis = nowMillis;
+ float percent = 0f;
long updateMillis;
TCompactionStatusUpdate last;
@@ -92,28 +86,33 @@ public class RunningCompactorInfo extends CompactorInfo {
updateMillis = lastEntry.getKey();
} else {
log.debug("No updates found for {}", ecid);
+ lastUpdate = nowMillis;
+ progress = percent;
+ status = "na";
return;
}
long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis - updateMillis);
log.debug("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis,
sinceLastUpdateSeconds);
- if (sinceLastUpdateSeconds > 30) {
- log.debug("Compaction hasn't progressed from {} in {} seconds.", progress,
- sinceLastUpdateSeconds);
- }
- float percent;
var total = last.getEntriesToBeCompacted();
- if (total <= 0) {
- percent = 0f;
- } else {
+ if (total > 0) {
percent = (last.getEntriesRead() / (float) total) * 100;
}
-
lastUpdate = nowMillis - updateMillis;
- status = last.state.name();
progress = percent;
+
+ if (updates.isEmpty()) {
+ status = "na";
+ } else {
+ status = last.state.name();
+ }
+ log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress);
+ if (sinceLastUpdateSeconds > 30) {
+ log.debug("Compaction hasn't progressed from {} in {} seconds.", progress,
+ sinceLastUpdateSeconds);
+ }
}
@Override
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
index a7dc56a..5e7c36f 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -21,11 +21,9 @@ package org.apache.accumulo.monitor.rest.compactions.external;
public class CompactorInfo {
// Variable names become JSON keys
- public long lastContact = 0L;
- public String server = "";
- public String queueName = "";
-
- public CompactorInfo() {}
+ public final long lastContact;
+ public final String server;
+ public final String queueName;
public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) {
lastContact = System.currentTimeMillis() - fetchedTimeMillis;
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 131abc8..b17a702 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -77,10 +77,9 @@ public class ECResource {
ecMap = monitor.fetchRunningInfo();
externalCompaction = ecMap.get(ecid);
if (externalCompaction == null) {
- log.warn("Failed to find details for ECID: {}", ecid);
- return new RunningCompactorDetails();
+ throw new IllegalStateException("Failed to find details for ECID: " + ecid);
}
}
- return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction);
+ return new RunningCompactorDetails(externalCompaction);
}
}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
index 294b91c..1f813f1 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
@@ -23,16 +23,16 @@ import java.util.List;
import java.util.Map;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
public class RunningCompactions {
- public final List<RunningCompactorInfo> running = new ArrayList<>();
+ public final List<RunningCompactionInfo> running = new ArrayList<>();
public RunningCompactions(Map<String,TExternalCompaction> rMap) {
if (rMap != null) {
- var fetchedTime = System.currentTimeMillis();
for (var entry : rMap.entrySet()) {
- running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(), entry.getValue()));
+ running.add(new RunningCompactionInfo(entry.getValue()));
}
}
}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
index c6c3af2..ad6cbff 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java
@@ -23,18 +23,15 @@ import java.util.List;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
-public class RunningCompactorDetails extends RunningCompactorInfo {
+public class RunningCompactorDetails extends RunningCompactionInfo {
// Variable names become JSON keys
- public List<CompactionInputFile> inputFiles = new ArrayList<>();
- public String outputFile;
+ public final List<CompactionInputFile> inputFiles;
+ public final String outputFile;
- public RunningCompactorDetails() {
- super();
- }
-
- public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) {
- super(fetchedTime, ecid, ec);
+ public RunningCompactorDetails(TExternalCompaction ec) {
+ super(ec);
var job = ec.getJob();
inputFiles = convertInputFiles(job.files);
outputFile = job.outputFile;
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 20f974c..27004d3 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -36,11 +36,11 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
@@ -63,7 +63,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
STARTED, QUARTER, HALF, THREE_QUARTERS
}
- Map<String,RunningCompactorInfo> runningMap = new HashMap<>();
+ Map<String,RunningCompactionInfo> runningMap = new HashMap<>();
List<EC_PROGRESS> progressList = new ArrayList<>();
private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
@@ -128,8 +128,8 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
if (ecMap != null) {
ecMap.forEach((ecid, ec) -> {
// returns null if it's a new mapping
- RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
- RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
+ RunningCompactionInfo rci = new RunningCompactionInfo(ec);
+ RunningCompactionInfo previousRci = runningMap.put(ecid, rci);
if (previousRci == null) {
log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles);
} else {