You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/16 19:35:03 UTC

[accumulo] branch 1451-external-compactions-feature updated: Updates from testing

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new b0317c7  Updates from testing
b0317c7 is described below

commit b0317c77a11e78f0683ddbe4b0f88d1c54b66f55
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 16 19:33:12 2021 +0000

    Updates from testing
    
    Added SharedRateLimiterFactory::remove to remove the compaction limiter
    from the list when we are done to prevent the background threads from
    getting a NPE. Modified the CompactionCoordinator::getCompactionJob
    loop to try another server if the first one does not have a job.
    Modified Thrift calls so that they did not return null and instead
    return an empty object. Modified all RetryableThriftCall<Void> to
    RetryableThriftCall<String> so that the object did not retry the
    function when there was a successful call.
---
 .../util/ratelimit/SharedRateLimiterFactory.java   |  14 ++
 .../coordinator/CompactionCoordinator.java         | 235 ++++++++++-----------
 .../accumulo/coordinator/RunningCompaction.java    |  12 +-
 .../accumulo/compactor/CompactionEnvironment.java  |  19 +-
 .../org/apache/accumulo/compactor/Compactor.java   | 104 +++++----
 .../accumulo/tserver/ThriftClientHandler.java      |   3 +-
 .../tserver/compactions/ExternalCompactionJob.java |   2 +
 .../accumulo/tserver/tablet/CompactableImpl.java   |   2 +
 8 files changed, 201 insertions(+), 190 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c24a3d2..accfd95 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -99,6 +99,20 @@ public class SharedRateLimiterFactory {
   }
 
   /**
+   * Remove the rate limiter from the set of active limiters, if it exists
+   *
+   * @param name
+   *          key for the rate limiter
+   */
+  public void remove(String name) {
+    synchronized (activeLimiters) {
+      if (activeLimiters.containsKey(name)) {
+        activeLimiters.remove(name);
+      }
+    }
+  }
+
+  /**
    * Walk through all of the currently active RateLimiters, having each update its current rate.
    * This is called periodically so that we can dynamically update as configuration changes.
    */
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5ede596..b4b2cb4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -189,6 +189,9 @@ public class CompactionCoordinator extends AbstractServer
     // TODO: On initial startup contact all running tservers to get information about the
     // compactions that are current running in external queues to populate the RUNNING map.
     // This is to handle the case where the coordinator dies or is restarted at runtime
+    //
+    // Alternatively, we could use the status messages in updateCompactionStatus to rebuild
+    // the RUNNING map.
 
     tserverSet.startListeningForTabletServerChanges();
 
@@ -251,7 +254,7 @@ public class CompactionCoordinator extends AbstractServer
       // Find any running compactions for the tserver
       final List<ExternalCompactionId> toCancel = new ArrayList<>();
       RUNNING.forEach((k, v) -> {
-        if (v.getCompactorAddress().equals(tsi)) {
+        if (v.getTserver().equals(tsi)) {
           toCancel.add(k);
         }
       });
@@ -295,63 +298,84 @@ public class CompactionCoordinator extends AbstractServer
     // CBUG need to use and check for system credentials
     LOG.debug("getCompactionJob " + queueName + " " + compactorAddress);
     String queue = queueName.intern();
-    TServerInstance tserver = null;
-    Long priority = null;
+    TExternalCompactionJob result = null;
     // CBUG Review synchronization on QUEUES
     synchronized (QUEUES) {
       TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
-      if (null != m) {
-        while (tserver == null) {
+      if (null != m && !m.isEmpty()) {
+        while (result == null) {
+
+          // m could become empty if we have contacted all tservers in this queue and
+          // there are no compactions
+          if (m.isEmpty()) {
+            LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+                compactorAddress);
+            result = new TExternalCompactionJob();
+            break;
+          }
+
           // Get the first TServerInstance from the highest priority queue
           Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
-          priority = entry.getKey();
+          Long priority = entry.getKey();
           LinkedHashSet<TServerInstance> tservers = entry.getValue();
-          if (null == tservers || m.isEmpty()) {
+
+          if (null == tservers || tservers.isEmpty()) {
             // Clean up the map entry when no tservers for this queue and priority
             m.remove(entry.getKey(), entry.getValue());
             continue;
           } else {
-            tserver = tservers.iterator().next();
+            TServerInstance tserver = tservers.iterator().next();
+            LOG.debug("Found tserver {} with priority {} for queue {}", tserver.getHostAndPort(),
+                priority, queue);
             // Remove the tserver from the list, we are going to run a compaction on this server
             tservers.remove(tserver);
-            if (tservers.size() == 0) {
+            if (tservers.isEmpty()) {
               // Clean up the map entry when no tservers remaining for this queue and priority
+              // CBUG This may be redundant as cleanup happens in the 'if' clause above
               m.remove(entry.getKey(), entry.getValue());
             }
             HashSet<QueueAndPriority> qp = INDEX.get(tserver);
             qp.remove(QueueAndPriority.get(queue, priority));
-            if (qp.size() == 0) {
+            if (qp.isEmpty()) {
               // Remove the tserver from the index
               INDEX.remove(tserver);
             }
-            break;
+            LOG.debug("Getting compaction for queue {} from tserver {}", queue,
+                tserver.getHostAndPort());
+            // Get a compaction from the tserver
+            TabletClientService.Client client = null;
+            try {
+              client = getTabletServerConnection(tserver);
+              TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
+                  getContext().rpcCreds(), queue, priority, compactorAddress);
+              if (null == job.getExternalCompactionId()) {
+                LOG.debug("No compactions found for queue {} on tserver {}, trying next tserver",
+                    queue, tserver.getHostAndPort(), compactorAddress);
+                continue;
+              }
+              RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+                  new RunningCompaction(job, compactorAddress, tserver));
+              LOG.debug("Returning external job {} to {}", job.externalCompactionId,
+                  compactorAddress);
+              result = job;
+              break;
+            } catch (TException e) {
+              LOG.error(
+                  "Error from tserver {} while trying to reserve compaction, trying next tserver",
+                  ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
+            } finally {
+              ThriftUtil.returnClient(client);
+            }
           }
         }
+      } else {
+        LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+            compactorAddress);
+        result = new TExternalCompactionJob();
       }
     }
+    return result;
 
-    if (null == tserver) {
-      LOG.debug("No compactions found for queue {}, returning empty job to compactor {}", queue,
-          compactorAddress);
-      return new TExternalCompactionJob();
-    }
-
-    TabletClientService.Client client = null;
-    try {
-      client = getTabletServerConnection(tserver);
-      TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(),
-          getContext().rpcCreds(), queue, priority, compactorAddress);
-      RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
-          new RunningCompaction(job, compactorAddress, tserver));
-      LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
-      return job;
-    } catch (TException e) {
-      LOG.error("Error reserving compaction from tserver {}",
-          ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
-      throw e;
-    } finally {
-      ThriftUtil.returnClient(client);
-    }
   }
 
   protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
@@ -377,44 +401,52 @@ public class CompactionCoordinator extends AbstractServer
     LOG.info("Compaction cancel requested, id: {}", externalCompactionId);
     RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
     if (null == rc) {
-      throw new UnknownCompactionIdException();
+      return;
     }
-    HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress());
-    RetryableThriftCall<Void> cancelThriftCall = new RetryableThriftCall<>(1000,
-        RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<Void>() {
-          @Override
-          public Void execute() throws TException {
-            Compactor.Client compactorConnection = null;
-            try {
-              compactorConnection = getCompactorConnection(compactor);
-              compactorConnection.cancel(rc.getJob().getExternalCompactionId());
-              return null;
-            } catch (TException e) {
-              throw e;
-            } finally {
-              ThriftUtil.returnClient(compactorConnection);
+    if (!rc.isCompleted()) {
+      HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress());
+      RetryableThriftCall<String> cancelThriftCall = new RetryableThriftCall<>(1000,
+          RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<String>() {
+            @Override
+            public String execute() throws TException {
+              Compactor.Client compactorConnection = null;
+              try {
+                compactorConnection = getCompactorConnection(compactor);
+                compactorConnection.cancel(rc.getJob().getExternalCompactionId());
+                return "";
+              } catch (TException e) {
+                throw e;
+              } finally {
+                ThriftUtil.returnClient(compactorConnection);
+              }
             }
-          }
-        });
-    try {
-      cancelThriftCall.run();
-    } catch (RetriesExceededException e) {
-      LOG.error("Unable to contact Compactor {} to cancel running compaction {}",
-          rc.getCompactorAddress(), rc.getJob(), e);
+          });
+      try {
+        cancelThriftCall.run();
+      } catch (RetriesExceededException e) {
+        LOG.error("Unable to contact Compactor {} to cancel running compaction {}",
+            rc.getCompactorAddress(), rc.getJob(), e);
+      }
     }
   }
 
+  /**
+   * TServer calls getCompactionStatus to get information about the compaction
+   *
+   * @param externalCompactionId
+   *          id
+   * @return compaction stats or null if not running
+   */
   @Override
   public List<Status> getCompactionStatus(String externalCompactionId) throws TException {
+    List<Status> status = new ArrayList<>();
     RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
-    if (null == rc) {
-      throw new UnknownCompactionIdException();
+    if (null != rc) {
+      rc.getUpdates().forEach((k, v) -> {
+        status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(),
+            rc.getCompactorAddress(), v.getState(), v.getMessage()));
+      });
     }
-    List<Status> status = new ArrayList<>();
-    rc.getUpdates().forEach((k, v) -> {
-      status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(),
-          rc.getCompactorAddress(), v.getState(), v.getMessage()));
-    });
     return status;
   }
 
@@ -434,6 +466,7 @@ public class CompactionCoordinator extends AbstractServer
     RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
       rc.setStats(stats);
+      rc.setCompleted();
     } else {
       LOG.error(
           "Compaction completed called by Compactor for {}, but no running compaction for that id.",
@@ -442,17 +475,19 @@ public class CompactionCoordinator extends AbstractServer
     }
     // Attempt up to ten times to contact the TServer and notify it that the compaction has
     // completed.
-    RetryableThriftCall<Void> completedThriftCall = new RetryableThriftCall<>(1000,
-        RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<Void>() {
+    RetryableThriftCall<String> completedThriftCall = new RetryableThriftCall<>(1000,
+        RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<String>() {
           @Override
-          public Void execute() throws TException {
+          public String execute() throws TException {
             TabletClientService.Client client = null;
             try {
               client = getTabletServerConnection(rc.getTserver());
               client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(),
                   externalCompactionId, stats.fileSize, stats.entriesWritten);
               RUNNING.remove(ecid, rc);
-              return null;
+              LOG.info("TServer {} notified of compaction {} completion",
+                  rc.getTserver().getHostAndPort(), externalCompactionId);
+              return "";
             } catch (TException e) {
               throw e;
             } finally {
@@ -461,64 +496,6 @@ public class CompactionCoordinator extends AbstractServer
           }
         });
     try {
-      // CBUG Saw the following situation in testing:
-      // 1. Compactor ran a compaction and completed.
-      // 2. Compactor called this method
-      // 3. Thrift timeout occurred and the Compactor retried due to RetryableThriftCall
-      // 4. Upon retry this method returned UnknownCompactionIdException because no entry in RUNNING
-
-      // See Method below where tserver could poll coordinator to see if compaction is completed.
-
-      // "compactor" #38 prio=5 os_prio=0 cpu=157.59ms elapsed=197.99s tid=0x000055ea28438800
-      // nid=0x4dae runnable [0x00007fb0c5f2e000]
-      // java.lang.Thread.State: RUNNABLE
-      // at sun.nio.ch.EPoll.wait(java.base@11.0.10/Native Method)
-      // at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.10/EPollSelectorImpl.java:120)
-      // at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.10/SelectorImpl.java:124)
-      // - locked <0x00000000f449acc0> (a sun.nio.ch.Util$2)
-      // - locked <0x00000000f449aa60> (a sun.nio.ch.EPollSelectorImpl)
-      // at sun.nio.ch.SelectorImpl.select(java.base@11.0.10/SelectorImpl.java:136)
-      // at
-      // org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
-      // at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
-      // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
-      // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
-      // at java.io.FilterInputStream.read(java.base@11.0.10/FilterInputStream.java:133)
-      // at java.io.BufferedInputStream.fill(java.base@11.0.10/BufferedInputStream.java:252)
-      // at java.io.BufferedInputStream.read1(java.base@11.0.10/BufferedInputStream.java:292)
-      // at java.io.BufferedInputStream.read(java.base@11.0.10/BufferedInputStream.java:351)
-      // - locked <0x00000000f466c028> (a java.io.BufferedInputStream)
-      // at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
-      // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
-      // at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:132)
-      // at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:100)
-      // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
-      // at
-      // org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:546)
-      // at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:637)
-      // at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:505)
-      // at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
-      // at
-      // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.recv_compactionCompleted(CompactionCoordinator.java:118)
-      // at
-      // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.compactionCompleted(CompactionCoordinator.java:104)
-      // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:296)
-      // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:291)
-      // at
-      // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:102)
-      // at org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:304)
-      // at org.apache.accumulo.compactor.Compactor.run(Compactor.java:509)
-
-      // "CompactionCoordinator-ClientPool-Worker-1" #47 daemon prio=5 os_prio=0 cpu=70.37ms
-      // elapsed=243.97s tid=0x00005590266cb800 nid=0x4db3 waiting on condition [0x00007f4c8cb6c000]
-      // java.lang.Thread.State: TIMED_WAITING (sleeping)
-      // at java.lang.Thread.sleep(java.base@11.0.10/Native Method)
-      // at org.apache.accumulo.fate.util.UtilWaitThread.sleep(UtilWaitThread.java:33)
-      // at
-      // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:113)
-      // at
-      // org.apache.accumulo.coordinator.CompactionCoordinator.compactionCompleted(CompactionCoordinator.java:462)
-
       completedThriftCall.run();
     } catch (RetriesExceededException e) {
       // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify
@@ -548,13 +525,14 @@ public class CompactionCoordinator extends AbstractServer
    *
    *
    * @param externalCompactionId
-   * @return CompactionStats or null if not completed
-   * @throws TException
+   * @return CompactionStats
+   * @throws UnknownCompactionIdException
+   *           if compaction is not running
    */
   public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException {
     var ecid = ExternalCompactionId.of(externalCompactionId);
     RunningCompaction rc = RUNNING.get(ecid);
-    if (null != rc && null != rc.getStats()) {
+    if (null != rc && rc.isCompleted()) {
       RUNNING.remove(ecid, rc);
       return rc.getStats();
     } else if (rc == null) {
@@ -591,6 +569,9 @@ public class CompactionCoordinator extends AbstractServer
     if (null != rc) {
       rc.addUpdate(timestamp, message, state);
     } else {
+      // TODO: If the Coordinator was restarted, we could use these status messages
+      // to re-populate the RUNNING set. This would require the job, compactor address
+      // and TServerInstance
       throw new UnknownCompactionIdException();
     }
   }
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
index cc36f50..ea35349 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator;
 
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.compaction.thrift.CompactionState;
 import org.apache.accumulo.core.metadata.TServerInstance;
@@ -31,7 +32,8 @@ public class RunningCompaction {
   private final TExternalCompactionJob job;
   private final String compactorAddress;
   private final TServerInstance tserver;
-  private Map<Long,CompactionUpdate> updates = new TreeMap<>();
+  private final Map<Long,CompactionUpdate> updates = new TreeMap<>();
+  private final AtomicBoolean completed = new AtomicBoolean(Boolean.FALSE);
   private CompactionStats stats = null;
 
   RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) {
@@ -69,4 +71,12 @@ public class RunningCompaction {
     return tserver;
   }
 
+  public boolean isCompleted() {
+    return completed.get();
+  }
+
+  public void setCompleted() {
+    completed.compareAndSet(false, true);
+  }
+
 }
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
index bfda224..fad132b 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -18,6 +18,9 @@
  */
 package org.apache.accumulo.compactor;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
@@ -33,14 +36,22 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 
-public class CompactionEnvironment implements CompactionEnv {
+public class CompactionEnvironment implements Closeable, CompactionEnv {
 
   private final ServerContext context;
   private final CompactionJobHolder jobHolder;
+  private final SharedRateLimiterFactory limiter;
 
   CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
     this.context = context;
     this.jobHolder = jobHolder;
+    this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration());
+  }
+
+  @Override
+  public void close() throws IOException {
+    limiter.remove("read_rate_limiter");
+    limiter.remove("write_rate_limiter");
   }
 
   @Override
@@ -55,14 +66,12 @@ public class CompactionEnvironment implements CompactionEnv {
 
   @Override
   public RateLimiter getReadLimiter() {
-    return SharedRateLimiterFactory.getInstance(context.getConfiguration())
-        .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
+    return limiter.create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
   }
 
   @Override
   public RateLimiter getWriteLimiter() {
-    return SharedRateLimiterFactory.getInstance(context.getConfiguration())
-        .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
+    return limiter.create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
   }
 
   @Override
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index f1d2454..7922c4f 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -65,7 +65,6 @@ import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.compaction.CompactionInfo;
-import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
 import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
@@ -287,14 +286,14 @@ public class Compactor extends AbstractServer
    */
   protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
       throws RetriesExceededException {
-    RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
-        RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
+    RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000,
+        RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() {
           @Override
-          public Void execute() throws TException {
+          public String execute() throws TException {
             try {
               coordinatorClient.compareAndSet(null, getCoordinatorClient());
               coordinatorClient.get().compactionCompleted(job.getExternalCompactionId(), stats);
-              return null;
+              return "";
             } catch (TException e) {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
               throw e;
@@ -379,7 +378,6 @@ public class Compactor extends AbstractServer
           final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
           final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
           final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
-          final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder);
 
           final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
           job.getFiles().forEach(f -> {
@@ -392,20 +390,22 @@ public class Compactor extends AbstractServer
           job.getIteratorSettings().getIterators()
               .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
 
-          org.apache.accumulo.server.compaction.Compactor compactor =
-              new org.apache.accumulo.server.compaction.Compactor(getContext(),
-                  KeyExtent.fromThrift(job.getExtent()), files, outputFile,
-                  job.isPropagateDeletes(), cenv, iters, tConfig);
-
-          LOG.info("Starting compactor");
-          started.countDown();
-
-          org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
-          CompactionStats cs = new CompactionStats();
-          cs.setEntriesRead(stat.getEntriesRead());
-          cs.setEntriesWritten(stat.getEntriesWritten());
-          cs.setFileSize(stat.getFileSize());
-          jobHolder.setStats(cs);
+          try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), jobHolder)) {
+            org.apache.accumulo.server.compaction.Compactor compactor =
+                new org.apache.accumulo.server.compaction.Compactor(getContext(),
+                    KeyExtent.fromThrift(job.getExtent()), files, outputFile,
+                    job.isPropagateDeletes(), cenv, iters, tConfig);
+
+            LOG.info("Starting compactor");
+            started.countDown();
+
+            org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
+            CompactionStats cs = new CompactionStats();
+            cs.setEntriesRead(stat.getEntriesRead());
+            cs.setEntriesWritten(stat.getEntriesWritten());
+            cs.setFileSize(stat.getFileSize());
+            jobHolder.setStats(cs);
+          }
           LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId());
           // Update state when completed
           updateCompactionState(job, CompactionState.SUCCEEDED,
@@ -504,28 +504,40 @@ public class Compactor extends AbstractServer
               }
             }
           }
-          try {
-            compactionThread.join();
-            this.updateCompactionCompleted(job, jobHolder.getStats());
-          } catch (InterruptedException e) {
-            LOG.error(
-                "Compactor thread was interrupted waiting for compaction to finish, cancelling job",
-                e);
+          compactionThread.join();
+
+          if (compactionThread.isInterrupted()) {
+            LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
+            try {
+              updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled");
+              // CBUG Might need to call updateCompactionCompleted or the TServer will not
+              // get notified that the compaction was cancelled
+            } catch (RetriesExceededException e) {
+              LOG.error("Error updating coordinator with compaction cancellation.", e);
+            }
+          } else if (err.get() != null) {
             try {
-              cancel(job.getExternalCompactionId());
-            } catch (TException e1) {
-              LOG.error("Error cancelling compaction.", e1);
+              updateCompactionState(job, CompactionState.FAILED,
+                  "Compaction failed due to: " + err.get().getMessage());
+              // CBUG Might need to call updateCompactionCompleted or the TServer will not
+              // get notified that the compaction failed
+            } catch (RetriesExceededException e) {
+              LOG.error("Error updating coordinator with compaction failure.", e);
             }
-          } catch (RetriesExceededException e) {
-            LOG.error(
-                "Error updating coordinator with compaction completion, cancelling compaction.", e);
+          } else {
             try {
-              cancel(job.getExternalCompactionId());
-            } catch (TException e1) {
-              LOG.error("Error cancelling compaction.", e1);
+              this.updateCompactionCompleted(job, jobHolder.getStats());
+            } catch (RetriesExceededException e) {
+              LOG.error(
+                  "Error updating coordinator with compaction completion, cancelling compaction.",
+                  e);
+              try {
+                cancel(job.getExternalCompactionId());
+              } catch (TException e1) {
+                LOG.error("Error cancelling compaction.", e1);
+              }
             }
           }
-
         } catch (InterruptedException e1) {
           LOG.error(
               "Compactor thread was interrupted waiting for compaction to start, cancelling job",
@@ -537,24 +549,6 @@ public class Compactor extends AbstractServer
           }
         }
 
-        if (compactionThread.isInterrupted()) {
-          LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
-          try {
-            updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled");
-          } catch (RetriesExceededException e) {
-            LOG.error("Error updating coordinator with compaction cancellation.", e);
-          }
-        }
-
-        Throwable thrown = err.get();
-        if (thrown != null) {
-          try {
-            updateCompactionState(job, CompactionState.FAILED,
-                "Compaction failed due to: " + thrown.getMessage());
-          } catch (RetriesExceededException e) {
-            LOG.error("Error updating coordinator with compaction failure.", e);
-          }
-        }
       }
 
     } catch (Exception e) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index f640e37..d8cebd1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -1687,8 +1687,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
       return extCompaction.toThrift();
     }
 
-    // CBUG thrift may not support null return types https://thrift.apache.org/docs/features.html
-    return null;
+    return new TExternalCompactionJob();
   }
 
   @Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index e2b04d9..3c02127 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -47,6 +47,8 @@ public class ExternalCompactionJob {
   private CompactionKind kind;
   private List<IteratorSetting> iters;
 
+  public ExternalCompactionJob() {}
+
   public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes,
       TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId,
       long priority, CompactionKind kind, List<IteratorSetting> iters) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index ecd7083..b9439a7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -771,9 +771,11 @@ public class CompactableImpl implements Compactable {
         TabletLogger.compacted(getExtent(), cInfo.job, metaFile);
       } catch (Exception e) {
         metaFile = null;
+        log.error("Error committing external compaction {}", extCompactionId, e);
         throw new RuntimeException(e);
       } finally {
         completeCompaction(cInfo.job, cInfo.jobFiles, metaFile);
+        log.debug("Completed commit of external compaction{}", extCompactionId);
       }
     } else {
       log.debug("Ignoring request to commit external compaction that is unknown {}",