You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/03/12 22:00:27 UTC
[accumulo] branch 1451-external-compactions-feature updated: Adds
external compaction id type
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
new e62773c Adds external compaction id type
e62773c is described below
commit e62773c04249211c4a1b7a817c58e5ea74ec96fa
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Mar 12 16:34:59 2021 -0500
Adds external compaction id type
---
.../server/compaction/ExternalCompactionId.java | 59 ++++++++++++++++++++++
.../coordinator/CompactionCoordinator.java | 20 ++++----
.../org/apache/accumulo/compactor/Compactor.java | 2 +-
.../accumulo/tserver/ThriftClientHandler.java | 7 +--
.../accumulo/tserver/compactions/Compactable.java | 4 +-
.../tserver/compactions/CompactionManager.java | 11 ++--
.../tserver/compactions/ExternalCompactionJob.java | 10 ++--
.../accumulo/tserver/tablet/CompactableImpl.java | 9 ++--
8 files changed, 93 insertions(+), 29 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
new file mode 100644
index 0000000..e6880c4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.compaction;
+
+import java.util.UUID;
+
+import org.apache.accumulo.core.data.AbstractId;
+
+public class ExternalCompactionId extends AbstractId<ExternalCompactionId> {
+
+ // A common prefix is nice when grepping logs for external compaction ids. The prefix also serves
+ // as a nice sanity check on data coming in over the network and from persistent storage.
+ private static final String PREFIX = "ECID:";
+
+ private ExternalCompactionId(UUID uuid) {
+ super(PREFIX + uuid);
+ }
+
+ private ExternalCompactionId(String id) {
+ super(id);
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ public static ExternalCompactionId generate() {
+ return new ExternalCompactionId(UUID.randomUUID());
+ }
+
+ public static ExternalCompactionId of(String id) {
+ if (!id.startsWith(PREFIX)) {
+ throw new IllegalArgumentException("Not a valid external compaction id " + id);
+ }
+
+ try {
+ UUID.fromString(id.substring(PREFIX.length()));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Not a valid external compaction id " + id, e);
+ }
+
+ return new ExternalCompactionId(id);
+ }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index bc5d4c6..d72bfaa 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
import org.apache.accumulo.server.compaction.RetryableThriftCall;
import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -86,7 +87,8 @@ public class CompactionCoordinator extends AbstractServer
/* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */
private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = new HashMap<>();
/* Map of compactionId to RunningCompactions */
- private static final Map<String,RunningCompaction> RUNNING = new ConcurrentHashMap<>();
+ private static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
+ new ConcurrentHashMap<>();
private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
private final AccumuloConfiguration aconf;
@@ -311,10 +313,9 @@ public class CompactionCoordinator extends AbstractServer
client = getTabletServerConnection(tserver);
TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
getContext().rpcCreds(), queue, priority, compactorAddress);
- RUNNING.put(job.getExternalCompactionId(),
+ RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
new RunningCompaction(job, compactorAddress, tserver));
- LOG.debug(
- "Returning external job id:" + job.externalCompactionId + " to " + compactorAddress);
+ LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
return job;
} finally {
ThriftUtil.returnClient(client);
@@ -341,7 +342,7 @@ public class CompactionCoordinator extends AbstractServer
*/
@Override
public void cancelCompaction(String externalCompactionId) throws TException {
- RunningCompaction rc = RUNNING.get(externalCompactionId);
+ RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
if (null == rc) {
throw new UnknownCompactionIdException();
}
@@ -371,7 +372,7 @@ public class CompactionCoordinator extends AbstractServer
@Override
public List<Status> getCompactionStatus(String externalCompactionId) throws TException {
- RunningCompaction rc = RUNNING.get(externalCompactionId);
+ RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
if (null == rc) {
throw new UnknownCompactionIdException();
}
@@ -394,7 +395,8 @@ public class CompactionCoordinator extends AbstractServer
@Override
public void compactionCompleted(String externalCompactionId, CompactionStats stats)
throws TException {
- RunningCompaction rc = RUNNING.get(externalCompactionId);
+ var ecid = ExternalCompactionId.of(externalCompactionId);
+ RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
rc.setStats(stats);
} else {
@@ -414,7 +416,7 @@ public class CompactionCoordinator extends AbstractServer
client = getTabletServerConnection(rc.getTserver());
client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(),
externalCompactionId, stats.fileSize, stats.entriesWritten);
- RUNNING.remove(externalCompactionId, rc);
+ RUNNING.remove(ecid, rc);
return null;
} catch (TException e) {
throw e;
@@ -462,7 +464,7 @@ public class CompactionCoordinator extends AbstractServer
@Override
public void updateCompactionStatus(String externalCompactionId, CompactionState state,
String message, long timestamp) throws TException {
- RunningCompaction rc = RUNNING.get(externalCompactionId);
+ RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
if (null != rc) {
rc.addUpdate(timestamp, message, state);
} else {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 73e790d..b86de52 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -407,7 +407,7 @@ public class Compactor extends AbstractServer
cs.setEntriesWritten(stat.getEntriesWritten());
cs.setFileSize(stat.getFileSize());
jobHolder.setStats(cs);
- LOG.info("Compaction completed successfully");
+ LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId());
// Update state when completed
updateCompactionState(job, CompactionState.SUCCEEDED,
"Compaction completed successfully");
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index e1897b2..f640e37 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -34,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -123,6 +122,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.Compactor;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.TooManyFilesException;
@@ -1701,8 +1701,9 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
- server.getCompactionManager().commitExternalCompaction(UUID.fromString(externalCompactionId),
- server.getOnlineTablets(), fileSize, entries);
+ server.getCompactionManager().commitExternalCompaction(
+ ExternalCompactionId.of(externalCompactionId), server.getOnlineTablets(), fileSize,
+ entries);
}
@Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
index cf92d8d..f8cc442 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
@@ -37,6 +36,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
/**
* Interface between compaction service and tablet.
@@ -93,5 +93,5 @@ public interface Compactable {
ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service, CompactionJob job,
String compactorId);
- void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries);
+ void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, long entries);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 392c7c1..db4d97d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.slf4j.Logger;
@@ -75,7 +75,7 @@ public class CompactionManager {
private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors;
// TODO this may need to be garbage collected... also will need to be populated when tablet load
- private Map<UUID,KeyExtent> runningExternalCompactions;
+ private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions;
private class Config {
Map<String,String> planners = new HashMap<>();
@@ -405,6 +405,7 @@ public class CompactionManager {
return services.values().stream().mapToInt(CompactionService::getCompactionsQueued).sum();
}
+ // CBUG would be nice to create a CompactorId type and use that instead of string.
public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
String compactorId) {
log.debug("Attempting to reserved external compaction queue:{} priority:{} compactor:{}",
@@ -414,7 +415,7 @@ public class CompactionManager {
var ecJob = extCE.reserveExternalCompaction(priority, compactorId);
if (ecJob != null) {
runningExternalCompactions.put(ecJob.getExternalCompactionId(), ecJob.getExtent());
- log.debug("Reserved external compaction ecid:{}", ecJob.getExternalCompactionId());
+ log.debug("Reserved external compaction {}", ecJob.getExternalCompactionId());
}
return ecJob;
}
@@ -427,8 +428,8 @@ public class CompactionManager {
return getExternalExecutor(CompactionExecutorId.externalId(queueName));
}
- public void commitExternalCompaction(UUID extCompactionId, Map<KeyExtent,Tablet> currentTablets,
- long fileSize, long entries) {
+ public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+ Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) {
KeyExtent extent = runningExternalCompactions.get(extCompactionId);
if (extent != null) {
Tablet tablet = currentTablets.get(extent);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index 4e30ea3..e2b04d9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.compactions;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -35,6 +34,7 @@ import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
import org.apache.accumulo.core.tabletserver.thrift.InputFile;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
public class ExternalCompactionJob {
@@ -42,14 +42,14 @@ public class ExternalCompactionJob {
private boolean propogateDeletes;
private TabletFile compactTmpName;
private KeyExtent extent;
- private UUID externalCompactionId;
+ private ExternalCompactionId externalCompactionId;
private long priority;
private CompactionKind kind;
private List<IteratorSetting> iters;
public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes,
- TabletFile compactTmpName, KeyExtent extent, UUID externalCompactionId, long priority,
- CompactionKind kind, List<IteratorSetting> iters) {
+ TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId,
+ long priority, CompactionKind kind, List<IteratorSetting> iters) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.propogateDeletes = propogateDeletes;
this.compactTmpName = Objects.requireNonNull(compactTmpName);
@@ -99,7 +99,7 @@ public class ExternalCompactionJob {
org.apache.accumulo.core.tabletserver.thrift.CompactionKind.valueOf(kind.name()));
}
- public UUID getExternalCompactionId() {
+ public ExternalCompactionId getExternalCompactionId() {
return externalCompactionId;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 3d1f5d5..ecd7083 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,6 +60,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.tserver.compactions.Compactable;
import org.apache.accumulo.tserver.compactions.CompactionManager;
@@ -721,7 +721,7 @@ public class CompactableImpl implements Compactable {
}
// TODO move to top of class
- private Map<UUID,CompactionInfo> externalCompactions = new ConcurrentHashMap<>();
+ private Map<ExternalCompactionId,CompactionInfo> externalCompactions = new ConcurrentHashMap<>();
@Override
public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service,
@@ -737,7 +737,7 @@ public class CompactableImpl implements Compactable {
cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C");
cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp"));
- UUID externalCompactionId = UUID.randomUUID();
+ ExternalCompactionId externalCompactionId = ExternalCompactionId.generate();
cInfo.job = job;
@@ -755,7 +755,8 @@ public class CompactableImpl implements Compactable {
}
@Override
- public void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries) {
+ public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize,
+ long entries) {
// CBUG double check w/ java docs that only one thread can remove
CompactionInfo cInfo = externalCompactions.remove(extCompactionId);