You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/09/16 15:50:31 UTC
[accumulo] branch main updated: Rename CompactionTask to
InternalJob (#2270)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f73af60 Rename CompactionTask to InternalJob (#2270)
f73af60 is described below
commit f73af60b86e452ef9cd429b56ccc53e93c602eda
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Sep 16 11:50:25 2021 -0400
Rename CompactionTask to InternalJob (#2270)
* Rename CompactionTask to InternalJob to be consistent with ExternalJob
* Rename variables to match InternalJob type
* Fix some wording in comments
* Add javadoc comments
---
.../tserver/compactions/CompactionExecutor.java | 6 +++
.../compactions/ExternalCompactionExecutor.java | 35 ++++++------
.../compactions/InternalCompactionExecutor.java | 63 +++++++++++-----------
.../accumulo/tserver/compactions/SubmittedJob.java | 3 ++
4 files changed, 61 insertions(+), 46 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
index 85581a8..0e6e823 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
@@ -24,6 +24,12 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+/**
+ * A non-pluggable component that executes compactions using multiple threads and has a priority
+ * queue. There are two types: Internal and External. The {@link InternalCompactionExecutor} runs
+ * within the Accumulo tserver process. The {@link ExternalCompactionExecutor} runs compactions
+ * outside the tserver.
+ */
public interface CompactionExecutor {
SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 7d2dcd5..7acf20f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -41,13 +41,16 @@ import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
import com.google.common.base.Preconditions;
+/**
+ * Runs compactions outside the tserver, typically by a process external to Accumulo.
+ */
public class ExternalCompactionExecutor implements CompactionExecutor {
- // This exist to provide an accurate count of queued compactions for metrics. The PriorityQueue is
+ // This set provides an accurate count of queued compactions for metrics. The PriorityQueue is
// not used because its size may be off due to it containing cancelled compactions. The collection
// below should not contain cancelled compactions. A concurrent set was not used because those do
// not have constant time size operations.
- private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new HashSet<>());
+ private final Set<ExternalJob> queuedJob = Collections.synchronizedSet(new HashSet<>());
private class ExternalJob extends SubmittedJob {
private final AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED);
@@ -61,7 +64,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
super(job);
this.compactable = compactable;
this.csid = csid;
- queuedTask.add(this);
+ queuedJob.add(this);
this.timeCreated = System.currentTimeMillis();
}
@@ -83,11 +86,11 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
if (expectedStatus == Status.QUEUED) {
canceled = status.compareAndSet(expectedStatus, Status.CANCELED);
if (canceled) {
- queuedTask.remove(this);
+ queuedJob.remove(this);
}
if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
- // Occasionally clean the queue of canceled tasks that have hung around because of their
+ // Occasionally clean the queue of canceled jobs that have hung around because of their
// low priority. This runs periodically, instead of every time something is canceled, to
// avoid hurting performance.
queue.removeIf(ej -> ej.getStatus() == Status.CANCELED);
@@ -106,8 +109,8 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
}
}
- private PriorityBlockingQueue<ExternalJob> queue;
- private CompactionExecutorId ceid;
+ private final PriorityBlockingQueue<ExternalJob> queue;
+ private final CompactionExecutorId ceid;
public ExternalCompactionExecutor(CompactionExecutorId ceid) {
this.ceid = ceid;
@@ -116,8 +119,8 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
priorityComparator =
priorityComparator.reversed().thenComparingLong(ExternalJob::getTimeCreated);
- this.queue = new PriorityBlockingQueue<ExternalJob>(100,
- priorityComparator.thenComparing(priorityComparator));
+ this.queue =
+ new PriorityBlockingQueue<>(100, priorityComparator.thenComparing(priorityComparator));
}
@Override
@@ -140,7 +143,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
public int getCompactionsQueued(CType ctype) {
if (ctype != CType.EXTERNAL)
return 0;
- return queuedTask.size();
+ return queuedJob.size();
}
@Override
@@ -166,7 +169,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
if (extJob.getJob().getPriority() >= priority) {
if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
- queuedTask.remove(extJob);
+ queuedJob.remove(extJob);
var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(),
compactorId, externalCompactionId);
if (ecj == null) {
@@ -191,7 +194,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
public Stream<TCompactionQueueSummary> summarize() {
HashSet<Short> uniqPrios = new HashSet<Short>();
- queuedTask.forEach(task -> uniqPrios.add(task.getJob().getPriority()));
+ queuedJob.forEach(job -> uniqPrios.add(job.getJob().getPriority()));
Stream<Short> prioStream = uniqPrios.stream();
@@ -212,13 +215,13 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
@Override
public void compactableClosed(KeyExtent extent) {
- List<ExternalJob> taskToCancel;
- synchronized (queuedTask) {
- taskToCancel = queuedTask.stream().filter(ejob -> ejob.getExtent().equals(extent))
+ List<ExternalJob> jobToCancel;
+ synchronized (queuedJob) {
+ jobToCancel = queuedJob.stream().filter(ejob -> ejob.getExtent().equals(extent))
.collect(Collectors.toList());
}
- taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+ jobToCancel.forEach(job -> job.cancel(Status.QUEUED));
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index cc7b66d..0ff836b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -47,6 +47,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+/**
+ * Runs compactions within the tserver.
+ */
public class InternalCompactionExecutor implements CompactionExecutor {
private static final Logger log = LoggerFactory.getLogger(InternalCompactionExecutor.class);
@@ -56,32 +59,32 @@ public class InternalCompactionExecutor implements CompactionExecutor {
private AtomicLong cancelCount = new AtomicLong();
private ThreadPoolExecutor threadPool;
- // This exist to provide an accurate count of queued compactions for metrics. The PriorityQueue is
+ // This set provides an accurate count of queued compactions for metrics. The PriorityQueue is
// not used because its size may be off due to it containing cancelled compactions. The collection
// below should not contain cancelled compactions. A concurrent set was not used because those do
// not have constant time size operations.
- private Set<CompactionTask> queuedTask = Collections.synchronizedSet(new HashSet<>());
+ private final Set<InternalJob> queuedJob = Collections.synchronizedSet(new HashSet<>());
- private AutoCloseable metricCloser;
+ private final AutoCloseable metricCloser;
- private RateLimiter readLimiter;
- private RateLimiter writeLimiter;
+ private final RateLimiter readLimiter;
+ private final RateLimiter writeLimiter;
- private class CompactionTask extends SubmittedJob implements Runnable {
+ private class InternalJob extends SubmittedJob implements Runnable {
- private AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED);
- private Compactable compactable;
- private CompactionServiceId csid;
- private Consumer<Compactable> completionCallback;
+ private final AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED);
+ private final Compactable compactable;
+ private final CompactionServiceId csid;
+ private final Consumer<Compactable> completionCallback;
private final long queuedTime;
- public CompactionTask(CompactionJob job, Compactable compactable, CompactionServiceId csid,
+ public InternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid,
Consumer<Compactable> completionCallback) {
super(job);
this.compactable = compactable;
this.csid = csid;
this.completionCallback = completionCallback;
- queuedTask.add(this);
+ queuedJob.add(this);
queuedTime = System.currentTimeMillis();
}
@@ -90,7 +93,7 @@ public class InternalCompactionExecutor implements CompactionExecutor {
try {
if (status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
- queuedTask.remove(this);
+ queuedJob.remove(this);
compactable.compact(csid, getJob(), readLimiter, writeLimiter, queuedTime);
completionCallback.accept(compactable);
}
@@ -117,24 +120,24 @@ public class InternalCompactionExecutor implements CompactionExecutor {
}
if (canceled)
- queuedTask.remove(this);
+ queuedJob.remove(this);
if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
- // Occasionally clean the queue of canceled tasks that have hung around because of their low
+ // Occasionally clean the queue of canceled jobs that have hung around because of their low
// priority. This runs periodically, instead of every time something is canceled, to avoid
// hurting performance.
queue.removeIf(runnable -> {
- CompactionTask compactionTask;
+ InternalJob internalJob;
if (runnable instanceof TraceRunnable) {
runnable = ((TraceRunnable) runnable).getRunnable();
}
- if (runnable instanceof CompactionTask) {
- compactionTask = (CompactionTask) runnable;
+ if (runnable instanceof InternalJob) {
+ internalJob = (InternalJob) runnable;
} else {
throw new IllegalArgumentException(
"Unknown runnable type " + runnable.getClass().getName());
}
- return compactionTask.getStatus() == Status.CANCELED;
+ return internalJob.getStatus() == Status.CANCELED;
});
}
@@ -151,8 +154,8 @@ public class InternalCompactionExecutor implements CompactionExecutor {
return getJob(((TraceRunnable) r).getRunnable());
}
- if (r instanceof CompactionTask) {
- return ((CompactionTask) r).getJob();
+ if (r instanceof InternalJob) {
+ return ((InternalJob) r).getJob();
}
throw new IllegalArgumentException("Unknown runnable type " + r.getClass().getName());
@@ -170,7 +173,7 @@ public class InternalCompactionExecutor implements CompactionExecutor {
"compaction." + ceid, queue, OptionalInt.empty(), true);
metricCloser =
- ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedTask.size());
+ ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size());
this.readLimiter = readLimiter;
this.writeLimiter = writeLimiter;
@@ -182,9 +185,9 @@ public class InternalCompactionExecutor implements CompactionExecutor {
public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable,
Consumer<Compactable> completionCallback) {
Preconditions.checkArgument(job.getExecutor().equals(ceid));
- var ctask = new CompactionTask(job, compactable, csid, completionCallback);
- threadPool.execute(ctask);
- return ctask;
+ var internalJob = new InternalJob(job, compactable, csid, completionCallback);
+ threadPool.execute(internalJob);
+ return internalJob;
}
public void setThreads(int numThreads) {
@@ -216,7 +219,7 @@ public class InternalCompactionExecutor implements CompactionExecutor {
public int getCompactionsQueued(CType ctype) {
if (ctype != CType.INTERNAL)
return 0;
- return queuedTask.size();
+ return queuedJob.size();
}
@Override
@@ -232,12 +235,12 @@ public class InternalCompactionExecutor implements CompactionExecutor {
@Override
public void compactableClosed(KeyExtent extent) {
- List<CompactionTask> taskToCancel;
- synchronized (queuedTask) {
- taskToCancel = queuedTask.stream().filter(ejob -> ejob.getExtent().equals(extent))
+ List<InternalJob> jobToCancel;
+ synchronized (queuedJob) {
+ jobToCancel = queuedJob.stream().filter(job -> job.getExtent().equals(extent))
.collect(Collectors.toList());
}
- taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+ jobToCancel.forEach(job -> job.cancel(Status.QUEUED));
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
index 1335e34..657f8f0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/SubmittedJob.java
@@ -20,6 +20,9 @@ package org.apache.accumulo.tserver.compactions;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
+/**
+ * A submitted Compaction job, either internal or external.
+ */
public abstract class SubmittedJob {
private final CompactionJob job;