You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2022/09/29 18:20:39 UTC

[accumulo] branch main updated: Refactor hot methods (#2811)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new cd0113eb19 Refactor hot methods (#2811)
cd0113eb19 is described below

commit cd0113eb1953cc4ca785e12d172c3f17437c3899
Author: Dom G <do...@apache.org>
AuthorDate: Thu Sep 29 14:20:34 2022 -0400

    Refactor hot methods (#2811)
    
    * Reduce size and time complexity of TabletClientHandler.flush
    * exract new method for determining locality group in LocalityGroupIterator
    * remove questionable use of streams
    * refactor ScanServer.reserveFiles - no longer hot
    * refactor SimpleLoadBalancer.move() - no longer hot
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../accumulo/core/file/rfile/RelativeKey.java      |  94 +++++--------
 .../system/LocalityGroupIterator.java              |  71 +++++-----
 .../core/spi/balancer/SimpleLoadBalancer.java      |  56 ++++----
 .../org/apache/accumulo/server/fs/FileManager.java |  29 ++--
 .../org/apache/accumulo/tserver/ScanServer.java    |  29 ++--
 .../accumulo/tserver/TabletClientHandler.java      |  36 ++---
 .../apache/accumulo/tserver/scan/LookupTask.java   |  74 +++++-----
 .../accumulo/tserver/tablet/CompactableImpl.java   | 151 +++++++++++----------
 8 files changed, 263 insertions(+), 277 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
index ef37ac1b9e..1398522ee0 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.rfile;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -84,42 +85,24 @@ public class RelativeKey implements Writable {
     fieldsSame = 0;
     fieldsPrefixed = 0;
 
-    ByteSequence prevKeyScratch;
-    ByteSequence keyScratch;
-
     if (prevKey != null) {
 
-      prevKeyScratch = prevKey.getRowData();
-      keyScratch = key.getRowData();
-      rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
-      if (rowCommonPrefixLen == -1)
-        fieldsSame |= ROW_SAME;
-      else if (rowCommonPrefixLen > 1)
-        fieldsPrefixed |= ROW_COMMON_PREFIX;
+      ByteSequence prevKeyScratch = prevKey.getRowData();
+      ByteSequence keyScratch = key.getRowData();
+      rowCommonPrefixLen =
+          getCommonPrefixLen(prevKeyScratch, keyScratch, ROW_SAME, ROW_COMMON_PREFIX);
 
       prevKeyScratch = prevKey.getColumnFamilyData();
       keyScratch = key.getColumnFamilyData();
-      cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
-      if (cfCommonPrefixLen == -1)
-        fieldsSame |= CF_SAME;
-      else if (cfCommonPrefixLen > 1)
-        fieldsPrefixed |= CF_COMMON_PREFIX;
+      cfCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CF_SAME, CF_COMMON_PREFIX);
 
       prevKeyScratch = prevKey.getColumnQualifierData();
       keyScratch = key.getColumnQualifierData();
-      cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
-      if (cqCommonPrefixLen == -1)
-        fieldsSame |= CQ_SAME;
-      else if (cqCommonPrefixLen > 1)
-        fieldsPrefixed |= CQ_COMMON_PREFIX;
+      cqCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CQ_SAME, CQ_COMMON_PREFIX);
 
       prevKeyScratch = prevKey.getColumnVisibilityData();
       keyScratch = key.getColumnVisibilityData();
-      cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
-      if (cvCommonPrefixLen == -1)
-        fieldsSame |= CV_SAME;
-      else if (cvCommonPrefixLen > 1)
-        fieldsPrefixed |= CV_COMMON_PREFIX;
+      cvCommonPrefixLen = getCommonPrefixLen(prevKeyScratch, keyScratch, CV_SAME, CV_COMMON_PREFIX);
 
       tsDiff = key.getTimestamp() - prevKey.getTimestamp();
       if (tsDiff == 0)
@@ -135,6 +118,17 @@ public class RelativeKey implements Writable {
       fieldsSame |= DELETED;
   }
 
+  private int getCommonPrefixLen(ByteSequence prevKeyScratch, ByteSequence keyScratch,
+      byte fieldBit, byte commonPrefix) {
+    int commonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+    if (commonPrefixLen == -1) {
+      fieldsSame |= fieldBit;
+    } else if (commonPrefixLen > 1) {
+      fieldsPrefixed |= commonPrefix;
+    }
+    return commonPrefixLen;
+  }
+
   /**
    *
    * @return -1 (exact match) or the number of bytes in common
@@ -174,40 +168,13 @@ public class RelativeKey implements Writable {
       fieldsPrefixed = 0;
     }
 
-    byte[] row, cf, cq, cv;
-    long ts;
-
-    if ((fieldsSame & ROW_SAME) == ROW_SAME) {
-      row = prevKey.getRowData().toArray();
-    } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
-      row = readPrefix(in, prevKey.getRowData());
-    } else {
-      row = read(in);
-    }
+    final byte[] row, cf, cq, cv;
+    final long ts;
 
-    if ((fieldsSame & CF_SAME) == CF_SAME) {
-      cf = prevKey.getColumnFamilyData().toArray();
-    } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
-      cf = readPrefix(in, prevKey.getColumnFamilyData());
-    } else {
-      cf = read(in);
-    }
-
-    if ((fieldsSame & CQ_SAME) == CQ_SAME) {
-      cq = prevKey.getColumnQualifierData().toArray();
-    } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
-      cq = readPrefix(in, prevKey.getColumnQualifierData());
-    } else {
-      cq = read(in);
-    }
-
-    if ((fieldsSame & CV_SAME) == CV_SAME) {
-      cv = prevKey.getColumnVisibilityData().toArray();
-    } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
-      cv = readPrefix(in, prevKey.getColumnVisibilityData());
-    } else {
-      cv = read(in);
-    }
+    row = getData(in, ROW_SAME, ROW_COMMON_PREFIX, () -> prevKey.getRowData());
+    cf = getData(in, CF_SAME, CF_COMMON_PREFIX, () -> prevKey.getColumnFamilyData());
+    cq = getData(in, CQ_SAME, CQ_COMMON_PREFIX, () -> prevKey.getColumnQualifierData());
+    cv = getData(in, CV_SAME, CV_COMMON_PREFIX, () -> prevKey.getColumnVisibilityData());
 
     if ((fieldsSame & TS_SAME) == TS_SAME) {
       ts = prevKey.getTimestamp();
@@ -221,6 +188,17 @@ public class RelativeKey implements Writable {
     this.prevKey = this.key;
   }
 
+  private byte[] getData(DataInput in, byte fieldBit, byte commonPrefix,
+      Supplier<ByteSequence> data) throws IOException {
+    if ((fieldsSame & fieldBit) == fieldBit) {
+      return data.get().toArray();
+    } else if ((fieldsPrefixed & commonPrefix) == commonPrefix) {
+      return readPrefix(in, data.get());
+    } else {
+      return read(in);
+    }
+  }
+
   public static class SkippR {
     RelativeKey rk;
     int skipped;
diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
index 010428797f..58c358acd5 100644
--- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -163,26 +164,26 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
       Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
     hiter.clear();
 
-    Set<ByteSequence> cfSet;
-    if (columnFamilies.isEmpty()) {
-      cfSet = Collections.emptySet();
-    } else {
-      if (columnFamilies instanceof Set<?>) {
-        cfSet = (Set<ByteSequence>) columnFamilies;
-      } else {
-        cfSet = new HashSet<>();
-        cfSet.addAll(columnFamilies);
-      }
-    }
+    final Set<ByteSequence> cfSet = getCfSet(columnFamilies);
 
     // determine the set of groups to use
-    Collection<LocalityGroup> groups = Collections.emptyList();
+    final Collection<LocalityGroup> groups = getLocalityGroups(lgContext, inclusive, cfSet);
 
+    for (LocalityGroup lgr : groups) {
+      lgr.getIterator().seek(range, EMPTY_CF_SET, false);
+      hiter.addSource(lgr.getIterator());
+    }
+
+    return groups;
+  }
+
+  private static Collection<LocalityGroup> getLocalityGroups(LocalityGroupContext lgContext,
+      boolean inclusive, Set<ByteSequence> cfSet) {
+
+    final Collection<LocalityGroup> groups;
     // if no column families specified, then include all groups unless !inclusive
     if (cfSet.isEmpty()) {
-      if (!inclusive) {
-        groups = lgContext.groups;
-      }
+      groups = inclusive ? List.of() : lgContext.groups;
     } else {
       groups = new HashSet<>();
 
@@ -207,35 +208,33 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
        * other
        */
       if (!inclusive) {
-        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
-          if (!cfSet.contains(entry.getKey())) {
-            groups.add(entry.getValue());
-          }
-        }
+        lgContext.groupByCf.entrySet().stream().filter(entry -> !cfSet.contains(entry.getKey()))
+            .map(Entry::getValue).forEach(groups::add);
       } else if (lgContext.groupByCf.size() <= cfSet.size()) {
-        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
-          if (cfSet.contains(entry.getKey())) {
-            groups.add(entry.getValue());
-          }
-        }
+        lgContext.groupByCf.entrySet().stream().filter(entry -> cfSet.contains(entry.getKey()))
+            .map(Entry::getValue).forEach(groups::add);
       } else {
-        for (ByteSequence cf : cfSet) {
-          LocalityGroup group = lgContext.groupByCf.get(cf);
-          if (group != null) {
-            groups.add(group);
-          }
-        }
+        cfSet.stream().map(lgContext.groupByCf::get).filter(Objects::nonNull).forEach(groups::add);
       }
     }
 
-    for (LocalityGroup lgr : groups) {
-      lgr.getIterator().seek(range, EMPTY_CF_SET, false);
-      hiter.addSource(lgr.getIterator());
-    }
-
     return groups;
   }
 
+  private static Set<ByteSequence> getCfSet(Collection<ByteSequence> columnFamilies) {
+    final Set<ByteSequence> cfSet;
+    if (columnFamilies.isEmpty()) {
+      cfSet = Collections.emptySet();
+    } else {
+      if (columnFamilies instanceof Set<?>) {
+        cfSet = (Set<ByteSequence>) columnFamilies;
+      } else {
+        cfSet = Set.copyOf(columnFamilies);
+      }
+    }
+    return cfSet;
+  }
+
   /**
    * This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise it will
    * delegate to the _seek method.
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
index 7a0111c2c9..4738485ad0 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.spi.balancer;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -245,31 +246,8 @@ public class SimpleLoadBalancer implements TabletBalancer {
     Map<TableId,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
 
     for (int i = 0; i < count; i++) {
-      TableId table;
-      Integer tooLittleCount;
-      if (tableToBalance == null) {
-        // find a table to migrate
-        // look for an uneven table count
-        int biggestDifference = 0;
-        TableId biggestDifferenceTable = null;
-        for (var tableEntry : tooMuchMap.entrySet()) {
-          TableId tableID = tableEntry.getKey();
-          tooLittleMap.putIfAbsent(tableID, 0);
-          int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
-          if (diff > biggestDifference) {
-            biggestDifference = diff;
-            biggestDifferenceTable = tableID;
-          }
-        }
-        if (biggestDifference < 2) {
-          table = busiest(tooMuch.status.getTableMap());
-        } else {
-          table = biggestDifferenceTable;
-        }
-      } else {
-        // just balance the given table
-        table = tableToBalance;
-      }
+      TableId table = getTableToMigrate(tooMuch, tooMuchMap, tooLittleMap);
+
       Map<TabletId,TabletStatistics> onlineTabletsForTable = donerTabletStats.get(table);
       try {
         if (onlineTabletsForTable == null) {
@@ -297,10 +275,7 @@ public class SimpleLoadBalancer implements TabletBalancer {
        * is only one tabletserver that holds all of the tablets. Here we check to see if in fact
        * that is the case and if so set the value to 0.
        */
-      tooLittleCount = tooLittleMap.get(table);
-      if (tooLittleCount == null) {
-        tooLittleCount = 0;
-      }
+      Integer tooLittleCount = tooLittleMap.getOrDefault(table, 0);
       tooLittleMap.put(table, tooLittleCount + 1);
       tooMuch.count--;
       tooLittle.count++;
@@ -309,6 +284,29 @@ public class SimpleLoadBalancer implements TabletBalancer {
     return result;
   }
 
+  private TableId getTableToMigrate(ServerCounts tooMuch, Map<TableId,Integer> tooMuchMap,
+      Map<TableId,Integer> tooLittleMap) {
+
+    if (tableToBalance != null) {
+      return tableToBalance;
+    }
+
+    // find a table to migrate
+    // look for an uneven table count
+    Entry<TableId,Integer> biggestEntry = tooMuchMap.entrySet().stream().map(entry -> {
+      TableId tableID = entry.getKey();
+      int diff = entry.getValue() - tooLittleMap.getOrDefault(tableID, 0);
+      return new SimpleEntry<>(tableID, diff); // map the table count to the difference
+    }).max(Entry.comparingByValue()) // get the largest difference
+        .orElseGet(() -> new SimpleEntry<>(null, 0));
+
+    if (biggestEntry.getValue() < 2) {
+      return busiest(tooMuch.status.getTableMap());
+    } else {
+      return biggestEntry.getKey();
+    }
+  }
+
   protected List<TabletStatistics> getOnlineTabletsForTable(TabletServerId tabletServerId,
       TableId tableId) throws AccumuloSecurityException, AccumuloException {
     return environment.listOnlineTabletsForTable(tabletServerId, tableId);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index e772f8b59f..0d3c85fed5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -504,20 +504,13 @@ public class FileManager {
 
       ArrayList<InterruptibleIterator> iters = new ArrayList<>();
 
-      boolean sawTimeSet = false;
-      for (DataFileValue dfv : files.values()) {
-        if (dfv.isTimeSet()) {
-          sawTimeSet = true;
-          break;
-        }
-      }
+      boolean sawTimeSet = files.values().stream().anyMatch(DataFileValue::isTimeSet);
 
       for (Entry<FileSKVIterator,String> entry : newlyReservedReaders.entrySet()) {
-        FileSKVIterator reader = entry.getKey();
+        FileSKVIterator source = entry.getKey();
         String filename = entry.getValue();
         InterruptibleIterator iter;
 
-        FileSKVIterator source = reader;
         if (samplerConfig != null) {
           source = source.getSample(samplerConfig);
           if (source == null) {
@@ -525,16 +518,8 @@ public class FileManager {
           }
         }
 
-        if (detachable) {
-          FileDataSource fds = new FileDataSource(filename, source);
-          dataSources.add(fds);
-          SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
-          iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
-              continueOnFailure, ssi);
-        } else {
-          iter = new ProblemReportingIterator(context, tablet.tableId(), filename,
-              continueOnFailure, source);
-        }
+        iter = new ProblemReportingIterator(context, tablet.tableId(), filename, continueOnFailure,
+            detachable ? getSsi(filename, source) : source);
 
         if (sawTimeSet) {
           // constructing FileRef is expensive so avoid if not needed
@@ -550,6 +535,12 @@ public class FileManager {
       return iters;
     }
 
+    private SourceSwitchingIterator getSsi(String filename, FileSKVIterator source) {
+      FileDataSource fds = new FileDataSource(filename, source);
+      dataSources.add(fds);
+      return new SourceSwitchingIterator(fds);
+    }
+
     public synchronized void detach() {
 
       releaseReaders(tablet, tabletReservedReaders, false);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index ce38d7161a..15574f3657 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -653,20 +653,7 @@ public class ScanServer extends AbstractServer
       throw new NoSuchScanIDException();
     }
 
-    Set<StoredTabletFile> scanSessionFiles;
-
-    if (session instanceof SingleScanSession) {
-      var sss = (SingleScanSession) session;
-      scanSessionFiles =
-          Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
-    } else if (session instanceof MultiScanSession) {
-      var mss = (MultiScanSession) session;
-      scanSessionFiles = mss.exents.stream()
-          .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
-          .collect(Collectors.toUnmodifiableSet());
-    } else {
-      throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
-    }
+    Set<StoredTabletFile> scanSessionFiles = getScanSessionFiles(session);
 
     long myReservationId = nextScanReservationId.incrementAndGet();
     // we are only reserving if the files already exists in reservedFiles, so only need the read
@@ -698,6 +685,20 @@ public class ScanServer extends AbstractServer
     return new ScanReservation(scanSessionFiles, myReservationId);
   }
 
+  private static Set<StoredTabletFile> getScanSessionFiles(ScanSession session) {
+    if (session instanceof SingleScanSession) {
+      var sss = (SingleScanSession) session;
+      return Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
+    } else if (session instanceof MultiScanSession) {
+      var mss = (MultiScanSession) session;
+      return mss.exents.stream()
+          .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
+          .collect(Collectors.toUnmodifiableSet());
+    } else {
+      throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
+    }
+  }
+
   private void cleanUpReservedFiles(long expireTimeMs) {
 
     // Do a quick check to see if there is any potential work. This check is done to avoid acquiring
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 9dab5bbf86..4437e6b422 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1202,33 +1202,27 @@ public class TabletClientHandler implements TabletClientService.Iface {
       throw new RuntimeException(e);
     }
 
-    ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
-
     KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
         ByteBufferUtil.toText(startRow));
 
-    for (Tablet tablet : server.getOnlineTablets().values()) {
-      if (ke.overlaps(tablet.getExtent())) {
-        tabletsToFlush.add(tablet);
-      }
-    }
+    List<Tablet> tabletsToFlush = server.getOnlineTablets().values().stream()
+        .filter(tablet -> ke.overlaps(tablet.getExtent())).collect(toList());
 
-    Long flushID = null;
+    if (tabletsToFlush.isEmpty())
+      return; // no tablets to flush
 
-    for (Tablet tablet : tabletsToFlush) {
-      if (flushID == null) {
-        // read the flush id once from zookeeper instead of reading
-        // it for each tablet
-        try {
-          flushID = tablet.getFlushID();
-        } catch (NoNodeException e) {
-          // table was probably deleted
-          log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
-          return;
-        }
-      }
-      tablet.flush(flushID);
+    // read the flush id once from zookeeper instead of reading it for each tablet
+    final long flushID;
+    try {
+      Tablet firstTablet = tabletsToFlush.get(0);
+      flushID = firstTablet.getFlushID();
+    } catch (NoNodeException e) {
+      // table was probably deleted
+      log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
+      return;
     }
+
+    tabletsToFlush.forEach(tablet -> tablet.flush(flushID));
   }
 
   @Override
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index bf4f2ab725..5eb7ebfa1a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -95,9 +95,11 @@ public class LookupTask extends ScanTask<MultiScanResult> {
       // check the time so that the read ahead thread is not monopolized
       while (iter.hasNext() && bytesAdded < maxResultsSize
           && (System.currentTimeMillis() - startTime) < maxScanTime) {
-        Entry<KeyExtent,List<Range>> entry = iter.next();
-        KeyExtent extent = entry.getKey();
-        List<Range> ranges = entry.getValue();
+
+        final Entry<KeyExtent,List<Range>> entry = iter.next();
+        final KeyExtent extent = entry.getKey();
+        final List<Range> ranges = entry.getValue();
+
         iter.remove();
 
         // check that tablet server is serving requested tablet
@@ -112,9 +114,8 @@ public class LookupTask extends ScanTask<MultiScanResult> {
 
         try {
 
-          // do the following check to avoid a race condition
-          // between setting false below and the task being
-          // canceled
+          // do the following check to avoid a race condition between setting false below and the
+          // task being canceled
           if (isCancelled())
             interruptFlag.set(true);
 
@@ -127,10 +128,8 @@ public class LookupTask extends ScanTask<MultiScanResult> {
           // Add results from this Tablet.lookup() to the accumulated results
           results.addAll(tabletResults);
 
-          // if the tablet was closed it it possible that the
-          // interrupt flag was set.... do not want it set for
-          // the next
-          // lookup
+          // if the tablet was closed, it is possible that the interrupt flag was set.... do not
+          // want it set for the next lookup
           interruptFlag.set(false);
 
         } catch (IOException e) {
@@ -164,29 +163,12 @@ public class LookupTask extends ScanTask<MultiScanResult> {
       long finishTime = System.currentTimeMillis();
       session.totalLookupTime += (finishTime - startTime);
       session.numEntries += results.size();
+      boolean queriesIsEmpty = !session.queries.isEmpty();
 
-      // convert everything to thrift before adding result
-      List<TKeyValue> retResults = new ArrayList<>();
-      for (KVEntry entry : results)
-        retResults
-            .add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
-      // @formatter:off
-      Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
-                      entry -> entry.getKey().toThrift(),
-                      entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
-      ));
-      // @formatter:on
-      List<TKeyExtent> retFullScans =
-          fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
-      TKeyExtent retPartScan = null;
-      TKey retPartNextKey = null;
-      if (partScan != null) {
-        retPartScan = partScan.toThrift();
-        retPartNextKey = partNextKey.toThrift();
-      }
       // add results to queue
-      addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan,
-          retPartNextKey, partNextKeyInclusive, !session.queries.isEmpty()));
+      MultiScanResult multiScanResult = getMultiScanResult(results, partScan, failures, fullScans,
+          partNextKey, partNextKeyInclusive, queriesIsEmpty);
+      addResult(multiScanResult);
     } catch (IterationInterruptedException iie) {
       if (!isCancelled()) {
         log.warn("Iteration interrupted, when scan not cancelled", iie);
@@ -202,4 +184,34 @@ public class LookupTask extends ScanTask<MultiScanResult> {
       runState.set(ScanRunState.FINISHED);
     }
   }
+
+  private MultiScanResult getMultiScanResult(List<KVEntry> results, KeyExtent partScan,
+      Map<KeyExtent,List<Range>> failures, List<KeyExtent> fullScans, Key partNextKey,
+      boolean partNextKeyInclusive, boolean queriesIsEmpty) {
+
+    // convert everything to thrift before adding result
+    List<TKeyValue> retResults = results.stream().map(
+        entry -> new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())))
+        .collect(Collectors.toList());
+
+    // @formatter:off
+    Map<TKeyExtent,List<TRange>> retFailures = failures.entrySet().stream().collect(Collectors.toMap(
+            entry -> entry.getKey().toThrift(),
+            entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
+    ));
+    // @formatter:on
+
+    List<TKeyExtent> retFullScans =
+        fullScans.stream().map(KeyExtent::toThrift).collect(Collectors.toList());
+
+    TKeyExtent retPartScan = null;
+    TKey retPartNextKey = null;
+    if (partScan != null) {
+      retPartScan = partScan.toThrift();
+      retPartNextKey = partNextKey.toThrift();
+    }
+
+    return new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey,
+        partNextKeyInclusive, queriesIsEmpty);
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index e4660ea964..6e3d728b21 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -396,83 +396,96 @@ public class CompactableImpl implements Compactable {
           if (isCompactionStratConfigured)
             return Set.of();
 
-          switch (selectStatus) {
-            case NOT_ACTIVE:
-            case CANCELED: {
-              Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
-              candidates.removeAll(allCompactingFiles);
-              return Collections.unmodifiableSet(candidates);
-            }
-            case NEW:
-            case SELECTING:
-              return Set.of();
-            case SELECTED: {
-              Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
-              candidates.removeAll(allCompactingFiles);
-              if (getNanoTime() - selectedTimeNanos < selectedExpirationDuration.toNanos()) {
-                candidates.removeAll(selectedFiles);
-              }
-              return Collections.unmodifiableSet(candidates);
-            }
-            case RESERVED: {
-              Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
-              candidates.removeAll(allCompactingFiles);
-              candidates.removeAll(selectedFiles);
-              return Collections.unmodifiableSet(candidates);
-            }
-            default:
-              throw new AssertionError();
-          }
+          return handleSystemCompaction(currFiles);
         }
         case SELECTOR:
           // intentional fall through
         case USER:
-          switch (selectStatus) {
-            case NOT_ACTIVE:
-            case NEW:
-            case SELECTING:
-            case CANCELED:
-              return Set.of();
-            case SELECTED:
-            case RESERVED: {
-              if (selectKind == kind) {
-                Set<StoredTabletFile> candidates = new HashSet<>(selectedFiles);
-                candidates.removeAll(allCompactingFiles);
-                candidates = Collections.unmodifiableSet(candidates);
-                // verify that candidates are still around and fail quietly if not
-                if (!currFiles.containsAll(candidates)) {
-                  log.debug("Selected files not in all files {} {}", candidates, currFiles);
-                  return Set.of();
-                }
-                return candidates;
-              } else {
-                return Set.of();
-              }
-            }
-            default:
-              throw new AssertionError();
-          }
+          return handleUserSelectorCompaction(currFiles, kind);
         case CHOP: {
-          switch (chopStatus) {
-            case NOT_ACTIVE:
-            case SELECTING:
-            case MARKING:
+          return handleChopCompaction(currFiles);
+        }
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    private Set<StoredTabletFile> handleChopCompaction(Set<StoredTabletFile> currFiles) {
+      switch (chopStatus) {
+        case NOT_ACTIVE:
+        case SELECTING:
+        case MARKING:
+          return Set.of();
+        case SELECTED: {
+          if (selectStatus == FileSelectionStatus.NEW
+              || selectStatus == FileSelectionStatus.SELECTING)
+            return Set.of();
+
+          var filesToChop = getFilesToChop(currFiles);
+          filesToChop.removeAll(allCompactingFiles);
+          if (selectStatus == FileSelectionStatus.SELECTED
+              || selectStatus == FileSelectionStatus.RESERVED)
+            filesToChop.removeAll(selectedFiles);
+          return Collections.unmodifiableSet(filesToChop);
+        }
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    private Set<StoredTabletFile> handleUserSelectorCompaction(Set<StoredTabletFile> currFiles,
+        CompactionKind kind) {
+      switch (selectStatus) {
+        case NOT_ACTIVE:
+        case NEW:
+        case SELECTING:
+        case CANCELED:
+          return Set.of();
+        case SELECTED:
+        case RESERVED: {
+          if (selectKind == kind) {
+            Set<StoredTabletFile> candidates = Sets.difference(selectedFiles, allCompactingFiles);
+            // verify that candidates are still around and fail quietly if not
+            if (!currFiles.containsAll(candidates)) {
+              log.debug("Selected files not in all files {} {}", candidates, currFiles);
               return Set.of();
-            case SELECTED: {
-              if (selectStatus == FileSelectionStatus.NEW
-                  || selectStatus == FileSelectionStatus.SELECTING)
-                return Set.of();
-
-              var filesToChop = getFilesToChop(currFiles);
-              filesToChop.removeAll(allCompactingFiles);
-              if (selectStatus == FileSelectionStatus.SELECTED
-                  || selectStatus == FileSelectionStatus.RESERVED)
-                filesToChop.removeAll(selectedFiles);
-              return Collections.unmodifiableSet(filesToChop);
             }
-            default:
-              throw new AssertionError();
+            // must create a copy because the sets passed to Sets.difference could change after this
+            // method returns
+            return Set.copyOf(candidates);
+          } else {
+            return Set.of();
+          }
+        }
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    private Set<StoredTabletFile> handleSystemCompaction(Set<StoredTabletFile> currFiles) {
+      switch (selectStatus) {
+        case NOT_ACTIVE:
+        case CANCELED: {
+          // must create a copy because the sets passed to Sets.difference could change after this
+          // method returns
+          return Set.copyOf(Sets.difference(currFiles, allCompactingFiles));
+        }
+        case NEW:
+        case SELECTING:
+          return Set.of();
+        case SELECTED: {
+          Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+          candidates.removeAll(allCompactingFiles);
+          if (getNanoTime() - selectedTimeNanos < selectedExpirationDuration.toNanos()) {
+            candidates.removeAll(selectedFiles);
           }
+          return Collections.unmodifiableSet(candidates);
+        }
+        case RESERVED: {
+          Set<StoredTabletFile> candidates = new HashSet<>(currFiles);
+          candidates.removeAll(allCompactingFiles);
+          candidates.removeAll(selectedFiles);
+          return Collections.unmodifiableSet(candidates);
         }
         default:
           throw new AssertionError();