You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/03/12 22:00:27 UTC

[accumulo] branch 1451-external-compactions-feature updated: Adds external compaction id type

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new e62773c  Adds external compaction id type
e62773c is described below

commit e62773c04249211c4a1b7a817c58e5ea74ec96fa
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Mar 12 16:34:59 2021 -0500

    Adds external compaction id type
---
 .../server/compaction/ExternalCompactionId.java    | 59 ++++++++++++++++++++++
 .../coordinator/CompactionCoordinator.java         | 20 ++++----
 .../org/apache/accumulo/compactor/Compactor.java   |  2 +-
 .../accumulo/tserver/ThriftClientHandler.java      |  7 +--
 .../accumulo/tserver/compactions/Compactable.java  |  4 +-
 .../tserver/compactions/CompactionManager.java     | 11 ++--
 .../tserver/compactions/ExternalCompactionJob.java | 10 ++--
 .../accumulo/tserver/tablet/CompactableImpl.java   |  9 ++--
 8 files changed, 93 insertions(+), 29 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
new file mode 100644
index 0000000..e6880c4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.compaction;
+
+import java.util.UUID;
+
+import org.apache.accumulo.core.data.AbstractId;
+
+public class ExternalCompactionId extends AbstractId<ExternalCompactionId> {
+
+  // A common prefix is nice when grepping logs for external compaction ids. The prefix also serves
+  // as a nice sanity check on data coming in over the network and from persistent storage.
+  private static final String PREFIX = "ECID:";
+
+  private ExternalCompactionId(UUID uuid) {
+    super(PREFIX + uuid);
+  }
+
+  private ExternalCompactionId(String id) {
+    super(id);
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public static ExternalCompactionId generate() {
+    return new ExternalCompactionId(UUID.randomUUID());
+  }
+
+  public static ExternalCompactionId of(String id) {
+    if (!id.startsWith(PREFIX)) {
+      throw new IllegalArgumentException("Not a valid external compaction id " + id);
+    }
+
+    try {
+      UUID.fromString(id.substring(PREFIX.length()));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Not a valid external compaction id " + id, e);
+    }
+
+    return new ExternalCompactionId(id);
+  }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index bc5d4c6..d72bfaa 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -86,7 +87,8 @@ public class CompactionCoordinator extends AbstractServer
   /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */
   private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = new HashMap<>();
   /* Map of compactionId to RunningCompactions */
-  private static final Map<String,RunningCompaction> RUNNING = new ConcurrentHashMap<>();
+  private static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
+      new ConcurrentHashMap<>();
 
   private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   private final AccumuloConfiguration aconf;
@@ -311,10 +313,9 @@ public class CompactionCoordinator extends AbstractServer
       client = getTabletServerConnection(tserver);
       TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
           getContext().rpcCreds(), queue, priority, compactorAddress);
-      RUNNING.put(job.getExternalCompactionId(),
+      RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
           new RunningCompaction(job, compactorAddress, tserver));
-      LOG.debug(
-          "Returning external job id:" + job.externalCompactionId + " to " + compactorAddress);
+      LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
       return job;
     } finally {
       ThriftUtil.returnClient(client);
@@ -341,7 +342,7 @@ public class CompactionCoordinator extends AbstractServer
    */
   @Override
   public void cancelCompaction(String externalCompactionId) throws TException {
-    RunningCompaction rc = RUNNING.get(externalCompactionId);
+    RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
     if (null == rc) {
       throw new UnknownCompactionIdException();
     }
@@ -371,7 +372,7 @@ public class CompactionCoordinator extends AbstractServer
 
   @Override
   public List<Status> getCompactionStatus(String externalCompactionId) throws TException {
-    RunningCompaction rc = RUNNING.get(externalCompactionId);
+    RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
     if (null == rc) {
       throw new UnknownCompactionIdException();
     }
@@ -394,7 +395,8 @@ public class CompactionCoordinator extends AbstractServer
   @Override
   public void compactionCompleted(String externalCompactionId, CompactionStats stats)
       throws TException {
-    RunningCompaction rc = RUNNING.get(externalCompactionId);
+    var ecid = ExternalCompactionId.of(externalCompactionId);
+    RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
       rc.setStats(stats);
     } else {
@@ -414,7 +416,7 @@ public class CompactionCoordinator extends AbstractServer
               client = getTabletServerConnection(rc.getTserver());
               client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(),
                   externalCompactionId, stats.fileSize, stats.entriesWritten);
-              RUNNING.remove(externalCompactionId, rc);
+              RUNNING.remove(ecid, rc);
               return null;
             } catch (TException e) {
               throw e;
@@ -462,7 +464,7 @@ public class CompactionCoordinator extends AbstractServer
   @Override
   public void updateCompactionStatus(String externalCompactionId, CompactionState state,
       String message, long timestamp) throws TException {
-    RunningCompaction rc = RUNNING.get(externalCompactionId);
+    RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
     if (null != rc) {
       rc.addUpdate(timestamp, message, state);
     } else {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 73e790d..b86de52 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -407,7 +407,7 @@ public class Compactor extends AbstractServer
           cs.setEntriesWritten(stat.getEntriesWritten());
           cs.setFileSize(stat.getFileSize());
           jobHolder.setStats(cs);
-          LOG.info("Compaction completed successfully");
+          LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId());
           // Update state when completed
           updateCompactionState(job, CompactionState.SUCCEEDED,
               "Compaction completed successfully");
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index e1897b2..f640e37 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -123,6 +122,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.Compactor;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.TooManyFilesException;
@@ -1701,8 +1701,9 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
 
-    server.getCompactionManager().commitExternalCompaction(UUID.fromString(externalCompactionId),
-        server.getOnlineTablets(), fileSize, entries);
+    server.getCompactionManager().commitExternalCompaction(
+        ExternalCompactionId.of(externalCompactionId), server.getOnlineTablets(), fileSize,
+        entries);
   }
 
   @Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
index cf92d8d..f8cc442 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
@@ -37,6 +36,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 
 /**
  * Interface between compaction service and tablet.
@@ -93,5 +93,5 @@ public interface Compactable {
   ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service, CompactionJob job,
       String compactorId);
 
-  void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries);
+  void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, long entries);
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 392c7c1..db4d97d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.slf4j.Logger;
@@ -75,7 +75,7 @@ public class CompactionManager {
   private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors;
 
   // TODO this may need to be garbage collected... also will need to be populated when tablet load
-  private Map<UUID,KeyExtent> runningExternalCompactions;
+  private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions;
 
   private class Config {
     Map<String,String> planners = new HashMap<>();
@@ -405,6 +405,7 @@ public class CompactionManager {
     return services.values().stream().mapToInt(CompactionService::getCompactionsQueued).sum();
   }
 
+  // CBUG would be nice to create a CompactorId type and use that instead of string.
   public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
       String compactorId) {
     log.debug("Attempting to reserved external compaction queue:{} priority:{} compactor:{}",
@@ -414,7 +415,7 @@ public class CompactionManager {
     var ecJob = extCE.reserveExternalCompaction(priority, compactorId);
     if (ecJob != null) {
       runningExternalCompactions.put(ecJob.getExternalCompactionId(), ecJob.getExtent());
-      log.debug("Reserved external compaction ecid:{}", ecJob.getExternalCompactionId());
+      log.debug("Reserved external compaction {}", ecJob.getExternalCompactionId());
     }
     return ecJob;
   }
@@ -427,8 +428,8 @@ public class CompactionManager {
     return getExternalExecutor(CompactionExecutorId.externalId(queueName));
   }
 
-  public void commitExternalCompaction(UUID extCompactionId, Map<KeyExtent,Tablet> currentTablets,
-      long fileSize, long entries) {
+  public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+      Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) {
     KeyExtent extent = runningExternalCompactions.get(extCompactionId);
     if (extent != null) {
       Tablet tablet = currentTablets.get(extent);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index 4e30ea3..e2b04d9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.compactions;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -35,6 +34,7 @@ import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
 import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 
 public class ExternalCompactionJob {
 
@@ -42,14 +42,14 @@ public class ExternalCompactionJob {
   private boolean propogateDeletes;
   private TabletFile compactTmpName;
   private KeyExtent extent;
-  private UUID externalCompactionId;
+  private ExternalCompactionId externalCompactionId;
   private long priority;
   private CompactionKind kind;
   private List<IteratorSetting> iters;
 
   public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes,
-      TabletFile compactTmpName, KeyExtent extent, UUID externalCompactionId, long priority,
-      CompactionKind kind, List<IteratorSetting> iters) {
+      TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId,
+      long priority, CompactionKind kind, List<IteratorSetting> iters) {
     this.jobFiles = Objects.requireNonNull(jobFiles);
     this.propogateDeletes = propogateDeletes;
     this.compactTmpName = Objects.requireNonNull(compactTmpName);
@@ -99,7 +99,7 @@ public class ExternalCompactionJob {
         org.apache.accumulo.core.tabletserver.thrift.CompactionKind.valueOf(kind.name()));
   }
 
-  public UUID getExternalCompactionId() {
+  public ExternalCompactionId getExternalCompactionId() {
     return externalCompactionId;
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 3d1f5d5..ecd7083 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -61,6 +60,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException;
+import org.apache.accumulo.server.compaction.ExternalCompactionId;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.tserver.compactions.Compactable;
 import org.apache.accumulo.tserver.compactions.CompactionManager;
@@ -721,7 +721,7 @@ public class CompactableImpl implements Compactable {
   }
 
   // TODO move to top of class
-  private Map<UUID,CompactionInfo> externalCompactions = new ConcurrentHashMap<>();
+  private Map<ExternalCompactionId,CompactionInfo> externalCompactions = new ConcurrentHashMap<>();
 
   @Override
   public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service,
@@ -737,7 +737,7 @@ public class CompactableImpl implements Compactable {
       cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C");
       cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp"));
 
-      UUID externalCompactionId = UUID.randomUUID();
+      ExternalCompactionId externalCompactionId = ExternalCompactionId.generate();
 
       cInfo.job = job;
 
@@ -755,7 +755,8 @@ public class CompactableImpl implements Compactable {
   }
 
   @Override
-  public void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries) {
+  public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize,
+      long entries) {
     // CBUG double check w/ java docs that only one thread can remove
     CompactionInfo cInfo = externalCompactions.remove(extCompactionId);