You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/09/14 11:53:05 UTC
[accumulo] branch main updated: Clarify compact method (#2268)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0d24673 Clarify compact method (#2268)
0d24673 is described below
commit 0d246732c5fe5af67aa5a2be129f659106370685
Author: Mike Miller <mm...@apache.org>
AuthorDate: Tue Sep 14 07:52:58 2021 -0400
Clarify compact method (#2268)
* Make compact method in CompactableUtils just call compact and return
the majc stats, removing the need to mutate the object across method calls.
* Pass the CompactionInfo object as a param to reduce the number of params
to the compact method.
* Pull logic from compact method and put into new bringOnline method, further
reducing the number of params to the compact method. The bringOnline method
returns the completed StoredTabletFile
* Replace anonymous CompactionEnv classes with 2 new classes: MinCEnv & MajCEnv
* Rename CompactionEnvironment to ExtCEnv to differentiate from the
other types of CompactionEnv implementations
* Add javadoc comments describing methods
---
.../org/apache/accumulo/compactor/Compactor.java | 2 +-
.../{CompactionEnvironment.java => ExtCEnv.java} | 4 +-
.../accumulo/tserver/tablet/CompactableImpl.java | 32 ++++---
.../accumulo/tserver/tablet/CompactableUtils.java | 105 +++++----------------
.../apache/accumulo/tserver/tablet/MajCEnv.java} | 68 +++++--------
.../apache/accumulo/tserver/tablet/MinCEnv.java} | 68 +++++--------
.../accumulo/tserver/tablet/MinorCompactor.java | 57 +----------
.../apache/accumulo/test/ExternalCompactionIT.java | 2 +-
8 files changed, 95 insertions(+), 243 deletions(-)
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index c72e286..305de7f 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -542,7 +542,7 @@ public class Compactor extends AbstractServer implements CompactorService.Iface
job.getIteratorSettings().getIterators()
.forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
- CompactionEnvironment cenv = new CompactionEnvironment(JOB_HOLDER, queueName);
+ ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
FileCompactor compactor = new FileCompactor(getContext(), extent, files, outputFile,
job.isPropagateDeletes(), cenv, iters, tConfig);
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
similarity index 96%
copy from server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
copy to server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
index 6f99bb0..5c6062e 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
@@ -36,7 +36,7 @@ import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
import com.google.common.annotations.VisibleForTesting;
-public class CompactionEnvironment implements CompactionEnv {
+public class ExtCEnv implements CompactionEnv {
private final CompactionJobHolder jobHolder;
private TExternalCompactionJob job;
@@ -58,7 +58,7 @@ public class CompactionEnvironment implements CompactionEnv {
}
}
- CompactionEnvironment(CompactionJobHolder jobHolder, String queueName) {
+ ExtCEnv(CompactionJobHolder jobHolder, String queueName) {
this.jobHolder = jobHolder;
this.job = jobHolder.getJob();
this.queueName = queueName;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index e64d53a..4345999 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -1031,7 +1031,7 @@ public class CompactableImpl implements Compactable {
}
}
- private static class CompactionInfo {
+ static class CompactionInfo {
Set<StoredTabletFile> jobFiles;
Long checkCompactionId = null;
boolean propagateDeletes = true;
@@ -1147,31 +1147,37 @@ public class CompactableImpl implements Compactable {
return;
var cInfo = ocInfo.get();
- StoredTabletFile metaFile = null;
+ StoredTabletFile newFile = null;
long startTime = System.currentTimeMillis();
- // create an empty stats object to be populated by CompactableUtils.compact()
+ CompactionKind kind = job.getKind();
+
CompactionStats stats = new CompactionStats();
try {
-
TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
tablet.incrementStatusMajor();
+ var check = new CompactionCheck(service, kind, cInfo.checkCompactionId);
+ TabletFile tmpFileName = tablet.getNextMapFilenameForMajc(cInfo.propagateDeletes);
+ var compactEnv = new MajCEnv(kind, check, readLimiter, writeLimiter, cInfo.propagateDeletes);
+
+ SortedMap<StoredTabletFile,DataFileValue> allFiles = tablet.getDatafiles();
+ HashMap<StoredTabletFile,DataFileValue> compactFiles = new HashMap<>();
+ cInfo.jobFiles.forEach(file -> compactFiles.put(file, allFiles.get(file)));
- metaFile = CompactableUtils.compact(tablet, job, cInfo.jobFiles, cInfo.checkCompactionId,
- cInfo.selectedFiles, cInfo.propagateDeletes, cInfo.localHelper, cInfo.iters,
- new CompactionCheck(service, job.getKind(), cInfo.checkCompactionId), readLimiter,
- writeLimiter, stats);
+ stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
- TabletLogger.compacted(getExtent(), job, metaFile);
+ newFile = CompactableUtils.bringOnline(tablet.getDatafileManager(), cInfo, stats,
+ compactFiles, allFiles, kind, tmpFileName);
+ TabletLogger.compacted(getExtent(), job, newFile);
} catch (CompactionCanceledException cce) {
log.debug("Compaction canceled {} ", getExtent());
- metaFile = null;
+ newFile = null;
} catch (Exception e) {
- metaFile = null;
+ newFile = null;
throw new RuntimeException(e);
} finally {
- completeCompaction(job, cInfo.jobFiles, metaFile);
- tablet.updateTimer(MAJOR, queuedTime, startTime, stats.getEntriesRead(), metaFile == null);
+ completeCompaction(job, cInfo.jobFiles, newFile);
+ tablet.updateTimer(MAJOR, queuedTime, startTime, stats.getEntriesRead(), newFile == null);
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 49884cd..c625263 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -34,7 +33,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
-import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
@@ -58,7 +56,6 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -73,26 +70,21 @@ import org.apache.accumulo.core.summary.Gatherer;
import org.apache.accumulo.core.summary.SummarizerFactory;
import org.apache.accumulo.core.summary.SummaryCollection;
import org.apache.accumulo.core.summary.SummaryReader;
-import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.compaction.FileCompactor;
import org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException;
import org.apache.accumulo.server.compaction.FileCompactor.CompactionEnv;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
-import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.tserver.compaction.CompactionPlan;
import org.apache.accumulo.tserver.compaction.CompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
import org.apache.accumulo.tserver.compaction.WriteParameters;
-import org.apache.accumulo.tserver.tablet.CompactableImpl.CompactionCheck;
import org.apache.accumulo.tserver.tablet.CompactableImpl.CompactionHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -552,88 +544,41 @@ public class CompactableUtils {
return copy;
}
- static StoredTabletFile compact(Tablet tablet, CompactionJob job, Set<StoredTabletFile> jobFiles,
- Long compactionId, Set<StoredTabletFile> selectedFiles, boolean propagateDeletes,
- CompactableImpl.CompactionHelper helper, List<IteratorSetting> iters,
- CompactionCheck compactionCheck, RateLimiter readLimiter, RateLimiter writeLimiter,
- CompactionStats stats) throws IOException, CompactionCanceledException {
- StoredTabletFile metaFile;
- CompactionEnv cenv = new CompactionEnv() {
- @Override
- public boolean isCompactionEnabled() {
- return compactionCheck.isCompactionEnabled();
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.majc;
- }
-
- @Override
- public RateLimiter getReadLimiter() {
- return readLimiter;
- }
-
- @Override
- public RateLimiter getWriteLimiter() {
- return writeLimiter;
- }
-
- @Override
- public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
- AccumuloConfiguration acuTableConf, TableId tableId) {
- return new TabletIteratorEnvironment(context, IteratorScope.majc, !propagateDeletes,
- acuTableConf, tableId, job.getKind());
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> getMinCIterator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TCompactionReason getReason() {
- switch (job.getKind()) {
- case USER:
- return TCompactionReason.USER;
- case CHOP:
- return TCompactionReason.CHOP;
- case SELECTOR:
- case SYSTEM:
- default:
- return TCompactionReason.SYSTEM;
- }
- }
- };
+ /**
+ * Create the FileCompactor and finally call compact. Returns the Major CompactionStats.
+ */
+ static CompactionStats compact(Tablet tablet, CompactionJob job,
+ CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
+ Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
+ throws IOException, CompactionCanceledException {
AccumuloConfiguration compactionConfig = getCompactionConfig(tablet.getTableConfiguration(),
- getOverrides(job.getKind(), tablet, helper, job.getFiles()));
-
- SortedMap<StoredTabletFile,DataFileValue> allFiles = tablet.getDatafiles();
- HashMap<StoredTabletFile,DataFileValue> compactFiles = new HashMap<>();
- jobFiles.forEach(file -> compactFiles.put(file, allFiles.get(file)));
-
- TabletFile compactTmpName = tablet.getNextMapFilenameForMajc(propagateDeletes);
+ getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles()));
FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(),
- compactFiles, compactTmpName, propagateDeletes, cenv, iters, compactionConfig);
+ compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, compactionConfig);
- var mcs = compactor.call();
+ return compactor.call();
+ }
- if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) {
- helper.getFilesToDrop().forEach(f -> {
+ /**
+ * Finish major compaction by bringing the new file online and returning the completed file.
+ */
+ static StoredTabletFile bringOnline(DatafileManager datafileManager,
+ CompactableImpl.CompactionInfo cInfo, CompactionStats stats,
+ Map<StoredTabletFile,DataFileValue> compactFiles,
+ SortedMap<StoredTabletFile,DataFileValue> allFiles, CompactionKind kind,
+ TabletFile compactTmpName) throws IOException {
+ if (kind == CompactionKind.USER || kind == CompactionKind.SELECTOR) {
+ cInfo.localHelper.getFilesToDrop().forEach(f -> {
if (allFiles.containsKey(f)) {
compactFiles.put(f, allFiles.get(f));
}
});
}
- // mutate the empty stats to allow returning their values
- stats.add(mcs);
-
- metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(compactFiles.keySet(),
- compactTmpName, compactionId, selectedFiles,
- new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()), Optional.empty());
- return metaFile;
+ var dfv = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
+ return datafileManager.bringMajorCompactionOnline(compactFiles.keySet(), compactTmpName,
+ cInfo.checkCompactionId, cInfo.selectedFiles, dfv, Optional.empty());
}
public static MajorCompactionReason from(CompactionKind ck) {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MajCEnv.java
similarity index 57%
copy from server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
copy to server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MajCEnv.java
index 6f99bb0..5e1f678 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MajCEnv.java
@@ -16,80 +16,63 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.compactor;
+package org.apache.accumulo.tserver.tablet;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.compaction.FileCompactor.CompactionEnv;
+import org.apache.accumulo.server.compaction.FileCompactor;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
-import com.google.common.annotations.VisibleForTesting;
-
-public class CompactionEnvironment implements CompactionEnv {
-
- private final CompactionJobHolder jobHolder;
- private TExternalCompactionJob job;
- private String queueName;
-
- public static class CompactorIterEnv extends TabletIteratorEnvironment {
-
- private String queueName;
-
- public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC,
- AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String queueName) {
- super(context, scope, fullMajC, tableConfig, tableId, kind);
- this.queueName = queueName;
- }
-
- @VisibleForTesting
- public String getQueueName() {
- return queueName;
- }
- }
-
- CompactionEnvironment(CompactionJobHolder jobHolder, String queueName) {
- this.jobHolder = jobHolder;
- this.job = jobHolder.getJob();
- this.queueName = queueName;
+public class MajCEnv implements FileCompactor.CompactionEnv {
+ private final CompactionKind kind;
+ private final RateLimiter readLimiter;
+ private final RateLimiter writeLimiter;
+ private final boolean propagateDeletes;
+ private final CompactableImpl.CompactionCheck compactionCheck;
+
+ public MajCEnv(CompactionKind kind, CompactableImpl.CompactionCheck compactionCheck,
+ RateLimiter readLimiter, RateLimiter writeLimiter, boolean propagateDeletes) {
+ this.kind = kind;
+ this.readLimiter = readLimiter;
+ this.writeLimiter = writeLimiter;
+ this.propagateDeletes = propagateDeletes;
+ this.compactionCheck = compactionCheck;
}
@Override
public boolean isCompactionEnabled() {
- return !jobHolder.isCancelled();
+ return compactionCheck.isCompactionEnabled();
}
@Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.majc;
+ public IteratorUtil.IteratorScope getIteratorScope() {
+ return IteratorUtil.IteratorScope.majc;
}
@Override
public RateLimiter getReadLimiter() {
- return NullRateLimiter.INSTANCE;
+ return readLimiter;
}
@Override
public RateLimiter getWriteLimiter() {
- return NullRateLimiter.INSTANCE;
+ return writeLimiter;
}
@Override
public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
AccumuloConfiguration acuTableConf, TableId tableId) {
- return new CompactorIterEnv(context, IteratorScope.majc,
- !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
- CompactionKind.valueOf(job.getKind().name()), queueName);
+ return new TabletIteratorEnvironment(context, IteratorUtil.IteratorScope.majc,
+ !propagateDeletes, acuTableConf, tableId, kind);
}
@Override
@@ -99,7 +82,7 @@ public class CompactionEnvironment implements CompactionEnv {
@Override
public TCompactionReason getReason() {
- switch (job.getKind()) {
+ switch (kind) {
case USER:
return TCompactionReason.USER;
case CHOP:
@@ -110,5 +93,4 @@ public class CompactionEnvironment implements CompactionEnv {
return TCompactionReason.SYSTEM;
}
}
-
}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
similarity index 52%
rename from server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
rename to server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
index 6f99bb0..9299469 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
@@ -16,99 +16,73 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.compactor;
+package org.apache.accumulo.tserver.tablet;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
-import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.compaction.FileCompactor.CompactionEnv;
+import org.apache.accumulo.server.compaction.FileCompactor;
import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.MinorCompactionReason;
-import com.google.common.annotations.VisibleForTesting;
+public class MinCEnv implements FileCompactor.CompactionEnv {
+ private final MinorCompactionReason reason;
+ private final SortedKeyValueIterator<Key,Value> iter;
-public class CompactionEnvironment implements CompactionEnv {
-
- private final CompactionJobHolder jobHolder;
- private TExternalCompactionJob job;
- private String queueName;
-
- public static class CompactorIterEnv extends TabletIteratorEnvironment {
-
- private String queueName;
-
- public CompactorIterEnv(ServerContext context, IteratorScope scope, boolean fullMajC,
- AccumuloConfiguration tableConfig, TableId tableId, CompactionKind kind, String queueName) {
- super(context, scope, fullMajC, tableConfig, tableId, kind);
- this.queueName = queueName;
- }
-
- @VisibleForTesting
- public String getQueueName() {
- return queueName;
- }
- }
-
- CompactionEnvironment(CompactionJobHolder jobHolder, String queueName) {
- this.jobHolder = jobHolder;
- this.job = jobHolder.getJob();
- this.queueName = queueName;
+ public MinCEnv(MinorCompactionReason reason, SortedKeyValueIterator<Key,Value> iter) {
+ this.reason = reason;
+ this.iter = iter;
}
@Override
public boolean isCompactionEnabled() {
- return !jobHolder.isCancelled();
+ return true;
}
@Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.majc;
+ public IteratorUtil.IteratorScope getIteratorScope() {
+ return IteratorUtil.IteratorScope.minc;
}
@Override
public RateLimiter getReadLimiter() {
- return NullRateLimiter.INSTANCE;
+ return null;
}
@Override
public RateLimiter getWriteLimiter() {
- return NullRateLimiter.INSTANCE;
+ return null;
}
@Override
public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
AccumuloConfiguration acuTableConf, TableId tableId) {
- return new CompactorIterEnv(context, IteratorScope.majc,
- !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
- CompactionKind.valueOf(job.getKind().name()), queueName);
+ return new TabletIteratorEnvironment(context, IteratorUtil.IteratorScope.minc, acuTableConf,
+ tableId);
}
@Override
public SortedKeyValueIterator<Key,Value> getMinCIterator() {
- throw new UnsupportedOperationException();
+ return iter;
}
@Override
public TCompactionReason getReason() {
- switch (job.getKind()) {
+ switch (reason) {
case USER:
return TCompactionReason.USER;
- case CHOP:
- return TCompactionReason.CHOP;
- case SELECTOR:
+ case CLOSE:
+ return TCompactionReason.CLOSE;
case SYSTEM:
default:
return TCompactionReason.SYSTEM;
}
}
-
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index c49f6d5..ab0f75d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -31,22 +31,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.TabletFile;
-import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.compaction.FileCompactor;
import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
-import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
@@ -67,52 +57,7 @@ public class MinorCompactor extends FileCompactor {
public MinorCompactor(TabletServer tabletServer, Tablet tablet, InMemoryMap imm,
TabletFile outputFile, MinorCompactionReason mincReason, TableConfiguration tableConfig) {
super(tabletServer.getContext(), tablet.getExtent(), Collections.emptyMap(), outputFile, true,
- new CompactionEnv() {
- @Override
- public boolean isCompactionEnabled() {
- return true;
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.minc;
- }
-
- @Override
- public RateLimiter getReadLimiter() {
- return null;
- }
-
- @Override
- public RateLimiter getWriteLimiter() {
- return null;
- }
-
- @Override
- public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
- AccumuloConfiguration acuTableConf, TableId tableId) {
- return new TabletIteratorEnvironment(context, IteratorScope.minc, acuTableConf,
- tableId);
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> getMinCIterator() {
- return imm.compactionIterator();
- }
-
- @Override
- public TCompactionReason getReason() {
- switch (mincReason) {
- case USER:
- return TCompactionReason.USER;
- case CLOSE:
- return TCompactionReason.CLOSE;
- case SYSTEM:
- default:
- return TCompactionReason.SYSTEM;
- }
- }
- }, Collections.emptyList(), tableConfig);
+ new MinCEnv(mincReason, imm.compactionIterator()), Collections.emptyList(), tableConfig);
this.tabletServer = tabletServer;
this.mincReason = mincReason;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index b39098d..a6ca80f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -53,8 +53,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
import org.apache.accumulo.coordinator.CompactionCoordinator;
import org.apache.accumulo.coordinator.ExternalCompactionMetrics;
import org.apache.accumulo.core.client.Accumulo;