You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/08/05 16:28:05 UTC

[GitHub] [accumulo] ctubbsii commented on a diff in pull request #2665: Eventually Consistent scans / ScanServer feature

ctubbsii commented on code in PR #2665:
URL: https://github.com/apache/accumulo/pull/2665#discussion_r938929295


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -153,6 +162,56 @@ private static <T> Supplier<T> memoizeWithExpiration(Supplier<T> s) {
     return () -> Suppliers.memoizeWithExpiration(s::get, 100, MILLISECONDS).get();
   }
 
+  private ScanServerSelector createScanServerSelector() {
+    String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
+    try {
+      Class<? extends ScanServerSelector> impl =
+          Class.forName(clazz).asSubclass(ScanServerSelector.class);
+      ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();
+

Review Comment:
   I thought we had a centralized utility class for loading classes from the configuration. This is probably redundant, but I haven't dug to find it.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -153,6 +162,56 @@ private static <T> Supplier<T> memoizeWithExpiration(Supplier<T> s) {
     return () -> Suppliers.memoizeWithExpiration(s::get, 100, MILLISECONDS).get();
   }
 
+  private ScanServerSelector createScanServerSelector() {
+    String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
+    try {
+      Class<? extends ScanServerSelector> impl =
+          Class.forName(clazz).asSubclass(ScanServerSelector.class);
+      ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();
+
+      Map<String,String> sserverProps = new HashMap<>();
+      ClientProperty
+          .getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
+          .forEach((k, v) -> {
+            sserverProps.put(
+                k.toString()
+                    .substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
+                v.toString());
+          });

Review Comment:
   Creating a map, then using forEach to populate it is a bit more clunky than using a stream map collector to construct the map directly from the iteration.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.TabletHostingServer;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A tablet that can not be written to and operates off of a snapshot of tablet metadata for its
+ * entire lifetime. The set of files and therefore the data backing this tablet will never change
+ * for its lifetime.
+ */
+public class SnapshotTablet extends TabletBase {
+
+  private static final Logger log = LoggerFactory.getLogger(SnapshotTablet.class);
+
+  private final TabletHostingServer server;
+  private final SortedMap<StoredTabletFile,DataFileValue> files;
+  private final TabletServerResourceManager.TabletResourceManager tabletResources;
+  private boolean closed = false;
+  private final AtomicLong dataSourceDeletions = new AtomicLong(0);
+
+  public SnapshotTablet(TabletHostingServer server, TabletMetadata metadata,
+      TabletServerResourceManager.TabletResourceManager tabletResources) {
+    super(server, metadata.getExtent());
+    this.server = server;
+    this.files = Collections.unmodifiableSortedMap(new TreeMap<>(metadata.getFilesMap()));
+    this.tabletResources = tabletResources;
+  }
+
+  @Override
+  public synchronized boolean isClosed() {
+    return closed;
+  }
+
+  @Override
+  public SortedMap<StoredTabletFile,DataFileValue> getDatafiles() {
+    return files;
+  }
+
+  @Override
+  public void addToYieldMetric(int i) {
+    this.server.getScanMetrics().addYield(i);
+  }
+
+  @Override
+  public long getDataSourceDeletions() {
+    return dataSourceDeletions.get();
+  }
+
+  @Override
+  TabletServerResourceManager.TabletResourceManager getTabletResources() {
+    return tabletResources;
+  }
+
+  @Override
+  public List<InMemoryMap.MemoryIterator> getMemIterators(SamplerConfigurationImpl samplerConfig) {
+    return List.of();
+  }
+
+  @Override
+  public void returnMemIterators(List<InMemoryMap.MemoryIterator> iters) {
+
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan() {
+    return new Pair(0L, getDatafiles());
+  }

Review Comment:
   It doesn't make sense to suppress a warning when it can just be fixed by adding the appropriate generics



##########
server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+
+public class ScanServerMetadataEntries {

Review Comment:
   This main class has no class level javadoc explaining what the utility is supposed to do.



##########
core/src/main/java/org/apache/accumulo/core/spi/scan/DefaultScanServerSelector.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.lang.reflect.Type;
+import java.security.SecureRandom;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.data.TabletId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * The default Accumulo selector for scan servers. This dispatcher will :
+ *
+ * <ul>
+ * <li>Hash each tablet to a per attempt configurable number of scan servers and then randomly
+ * choose one of those scan servers. Using hashing allows different client to select the same scan
+ * servers for a given tablet.</li>
+ * <li>Use a per attempt configurable busy timeout.</li>
+ * </ul>
+ *
+ * <p>
+ * This class accepts a single configuration that has a json value. To configure this class set
+ * {@code scan.server.selector.opts.profiles=<json>} in the accumulo client configuration along with
+ * the config for the class. The following is the default configuration value.
+ * </p>
+ * <p>
+ * {@value DefaultScanServerSelector#PROFILES_DEFAULT}
+ * </p>
+ *
+ * The json is structured as a list of profiles, with each profile having the following fields.
+ *
+ * <ul>
+ * <li><b>isDefault : </b> A boolean that specifies whether this is the default profile. One and
+ * only one profile must set this to true.</li>
+ * <li><b>maxBusyTimeout : </b> The maximum busy timeout to use. The busy timeout from the last
+ * attempt configuration grows exponentially up to this max.</li>
+ * <li><b>scanTypeActivations : </b> A list of scan types that will activate this profile. Scan
+ * types are specified by setting {@code scan_type=<scan_type>} as execution on the scanner. See
+ * {@link org.apache.accumulo.core.client.ScannerBase#setExecutionHints(Map)}</li>
+ * <li><b>group : </b> Scan servers can be started with an optional group. If specified, this option
+ * will limit the scan servers used to those that were started with this group name. If not
+ * specified, the set of scan servers that did not specify a group will be used. Grouping scan
+ * servers supports at least two use cases. First groups can be used to dedicate resources for
+ * certain scans. Second groups can be used to have different hardware/VM types for scans, for
+ * example could have some scans use expensive high memory VMs and others use cheaper burstable VMs.
+ * <li><b>attemptPlans : </b> A list of configuration to use for each scan attempt. Each list object
+ * has the following fields:
+ * <ul>
+ * <li><b>servers : </b> The number of servers to randomly choose from for this attempt.</li>
+ * <li><b>busyTimeout : </b> The busy timeout to use for this attempt.</li>
+ * <li><b>salt : </b> An optional string to append when hashing the tablet. When this is set
+ * differently for attempts it has the potential to cause the set of servers chosen from to be
+ * disjoint. When not set or the same, the servers between attempts will be subsets.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * Below is an example configuration with two profiles, one is the default and the other is used
+ * when the scan execution hint {@code scan_type=slow} is set.
+ * </p>
+ *
+ * <pre>
+ *    [
+ *     {
+ *       "isDefault":true,
+ *       "maxBusyTimeout":"5m",
+ *       "busyTimeoutMultiplier":4,
+ *       "attemptPlans":[
+ *         {"servers":"3", "busyTimeout":"33ms"},
+ *         {"servers":"100%", "busyTimeout":"100ms"}
+ *       ]
+ *     },
+ *     {
+ *       "scanTypeActivations":["slow"],
+ *       "maxBusyTimeout":"20m",
+ *       "busyTimeoutMultiplier":8,
+ *       "group":"lowcost",
+ *       "attemptPlans":[
+ *         {"servers":"1", "busyTimeout":"10s"},
+ *         {"servers":"3", "busyTimeout":"30s","salt":"42"},
+ *         {"servers":"9", "busyTimeout":"60s","salt":"84"}
+ *       ]
+ *     }
+ *    ]
+ * </pre>
+ *
+ * <p>
+ * For the default profile in the example it will start off by choosing randomly from 3 scan servers
+ * based on a hash of the tablet with no salt. For the first attempt it will use a busy timeout of
+ * 33 milliseconds. If the first attempt returns with busy, then it will randomly choose from 100%
+ * or all servers for the second attempt and use a busy timeout of 100ms. For subsequent attempts it
+ * will keep choosing from all servers and start multiplying the busy timeout by 4 until the max
+ * busy timeout of 4 minutes is reached.
+ * </p>
+ *
+ * <p>
+ * For the profile activated by {@code scan_type=slow} it start off by choosing randomly from 1 scan
+ * server based on a hash of the tablet with no salt and a busy timeout of 10s. The second attempt
+ * will choose from 3 scan servers based on a hash of the tablet plus the salt {@literal 42}.
+ * Without the salt, the single scan servers from the first attempt would always be included in the
+ * set of 3. With the salt the single scan server from the first attempt may not be included. The
+ * third attempt will choose a scan server from 9 using the salt {@literal 84} and a busy timeout of
+ * 60s. The different salt means the set of servers that attempts 2 and 3 choose from may be
+ * disjoint. Attempt 4 and greater will continue to choose from the same 9 servers as attempt 3 and
+ * will keep increasing the busy timeout by multiplying 8 until the maximum of 20 minutes is
+ * reached. For this profile it will choose from scan servers in the group {@literal lowcost}.
+ * </p>
+ */
+public class DefaultScanServerSelector implements ScanServerSelector {

Review Comment:
   This class can be configured as the default, but it shouldn't be *named* "Default". If it ever becomes *not* the default, then having it still named "Default" creates a lot of confusion. It should be named after its general strategy.
   
   For example, git's default merge strategy is called "recursive", not "default".



##########
server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java:
##########
@@ -266,4 +273,71 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) {
+    try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
+      String prefix = ScanServerFileReferenceSection.getRowPrefix();
+      for (ScanServerRefTabletFile ref : scanRefs) {
+        Mutation m = new Mutation(prefix + ref.getRowSuffix());
+        m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
+        writer.addMutation(m);
+      }
+    } catch (MutationsRejectedException | TableNotFoundException e) {
+      throw new RuntimeException(e);

Review Comment:
   here too



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+
+public class ScanAttemptsImpl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanAttemptsImpl.class);
+
+  static class ScanAttemptImpl
+      implements org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt {
+
+    private final String server;
+    private final long time;
+    private final Result result;
+    private volatile long mutationCount = Long.MAX_VALUE;
+
+    ScanAttemptImpl(Result result, String server, long time) {
+      this.result = result;
+      this.server = Objects.requireNonNull(server);
+      this.time = time;
+    }
+
+    @Override
+    public String getServer() {
+      return server;
+    }
+
+    @Override
+    public long getEndTime() {
+      return time;
+    }
+
+    @Override
+    public Result getResult() {
+      return result;
+    }
+
+    private void setMutationCount(long mc) {
+      this.mutationCount = mc;
+    }
+
+    public long getMutationCount() {
+      return mutationCount;
+    }
+  }
+
+  private Map<TabletId,Collection<ScanAttemptImpl>> attempts = new ConcurrentHashMap<>();
+  private long mutationCounter = 0;
+
+  private void add(TabletId tablet, ScanAttempt.Result result, String server, long endTime) {
+
+    ScanAttemptImpl sa = new ScanAttemptImpl(result, server, endTime);
+
+    attempts.computeIfAbsent(tablet, k -> ConcurrentHashMap.newKeySet()).add(sa);
+
+    synchronized (this) {
+      // now that the scan attempt obj is added to all concurrent data structs, make it visible
+      // need to atomically increment the counter AND set the counter on the object
+      sa.setMutationCount(mutationCounter++);
+    }
+
+  }
+
+  public static interface ScanAttemptReporter {
+    void report(ScanAttempt.Result result);
+  }
+
+  ScanAttemptReporter createReporter(String server, TabletId tablet) {
+    return new ScanAttemptReporter() {
+      @Override
+      public void report(ScanAttempt.Result result) {
+        LOG.trace("Received result: {}", result);
+        add(tablet, result, server, System.currentTimeMillis());
+      }
+    };
+  }
+
+  Map<TabletId,Collection<ScanAttemptImpl>> snapshot() {
+    // allows only seeing scan attempt objs that were added before this call
+
+    long snapMC;
+    synchronized (ScanAttemptsImpl.this) {
+      snapMC = mutationCounter;
+    }
+    var tmp = Maps.transformValues(attempts, tabletAttemptList -> Collections2
+        .filter(tabletAttemptList, sai -> sai.getMutationCount() < snapMC));
+
+    return Maps.filterEntries(tmp, entry -> !entry.getValue().isEmpty());

Review Comment:
   This comment is still applicable.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -332,6 +392,51 @@ public synchronized BatchWriterConfig getBatchWriterConfig() {
     return batchWriterConfig;
   }
 
+  public static class ScanServerInfo {
+    public final UUID uuid;
+    public final String group;
+
+    public ScanServerInfo(UUID uuid, String group) {
+      this.uuid = uuid;
+      this.group = group;
+    }
+
+  }
+
+  /**
+   * @return map of live scan server addresses to lock uuids.
+   */
+  public Map<String,ScanServerInfo> getScanServers() {
+    Map<String,ScanServerInfo> liveScanServers = new HashMap<>();
+    String root = this.getZooKeeperRoot() + Constants.ZSSERVERS;
+    var addrs = this.getZooCache().getChildren(root);
+    for (String addr : addrs) {
+      try {
+        final var zLockPath = ServiceLock.path(root + "/" + addr);
+        ZcStat stat = new ZcStat();
+        byte[] lockData = ServiceLock.getLockData(getZooCache(), zLockPath, stat);
+        if (lockData != null) {
+          String[] fields = new String(lockData, UTF_8).split(",", 2);
+          UUID uuid = UUID.fromString(fields[0]);
+          String group = fields[1];
+          liveScanServers.put(addr, new ScanServerInfo(uuid, group));
+        }
+      } catch (Exception e) {
+        log.error("Error validating zookeeper scan server node: " + addr, e);

Review Comment:
   Overly broad exception catching, especially since they're merely logged and continued. Could let RTEs fall through and only catch checked exceptions.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientServiceEnvironmentImpl.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.util.ConfigurationImpl;
+
+public class ClientServiceEnvironmentImpl implements ServiceEnvironment {
+
+  private final ClientContext context;
+
+  public ClientServiceEnvironmentImpl(ClientContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    try {
+      return new ConfigurationImpl(
+          new ConfigurationCopy(context.instanceOperations().getSystemConfiguration()));
+    } catch (Exception e) {
+      throw new RuntimeException("Error getting system configuration", e);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration(TableId tableId) {
+    try {
+      return new ConfigurationImpl(
+          new ConfigurationCopy(context.tableOperations().getConfiguration(getTableName(tableId))));
+    } catch (Exception e) {
+      throw new RuntimeException("Error getting table configuration", e);
+    }

Review Comment:
   here too



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java:
##########
@@ -127,9 +154,6 @@ public T get(long timeout, TimeUnit unit)
     // returned
     resultQueue = null;
 
-    if (r instanceof Error)
-      throw (Error) r; // don't wrap an Error
-

Review Comment:
   Why was this removed? I think this will prevent the OOM handler from detecting OOMs thrown from threads.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientServiceEnvironmentImpl.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.util.ConfigurationImpl;
+
+public class ClientServiceEnvironmentImpl implements ServiceEnvironment {
+
+  private final ClientContext context;
+
+  public ClientServiceEnvironmentImpl(ClientContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    try {
+      return new ConfigurationImpl(
+          new ConfigurationCopy(context.instanceOperations().getSystemConfiguration()));
+    } catch (Exception e) {
+      throw new RuntimeException("Error getting system configuration", e);

Review Comment:
   Please avoid overly broad exception catching, and use more specific exception types than generic RTE when throwing..



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -332,6 +392,51 @@ public synchronized BatchWriterConfig getBatchWriterConfig() {
     return batchWriterConfig;
   }
 
+  public static class ScanServerInfo {
+    public final UUID uuid;
+    public final String group;
+
+    public ScanServerInfo(UUID uuid, String group) {
+      this.uuid = uuid;
+      this.group = group;
+    }
+
+  }

Review Comment:
   This class doesn't need to be an inner class, and could either be its own class or just be a `Pair<UUID, String>`



##########
test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java:
##########
@@ -142,6 +143,9 @@ private void doWorkToGenerateMetrics() throws Exception {
         writer.addMutation(m);
       }
       client.tableOperations().compact(tableName, new CompactionConfig());
+      try (Scanner scanner = client.createScanner(tableName)) {
+        scanner.forEach((k, v) -> {});
+      }

Review Comment:
   How do we know this returned anything? Should we count these results?



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java:
##########
@@ -0,0 +1,987 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.SnapshotTablet;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends AbstractServer
+    implements TabletScanClientService.Iface, TabletHostingServer {
+
+  public static class ScanServerOpts extends ServerOpts {
+    @Parameter(required = false, names = {"-g", "--group"},
+        description = "Optional group name that will be made available to the ScanServerSelector client plugin.  If not specified will be set to '"
+            + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME
+            + "'.  Groups support at least two use cases : dedicating resources to scans and/or using different hardware for scans.")
+    private String groupName = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+    public String getGroupName() {
+      return groupName;
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")

Review Comment:
   This suppression could be made more narrow by putting it on a variable assignment rather than the whole method, if it's necessary at all.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -153,6 +162,56 @@ private static <T> Supplier<T> memoizeWithExpiration(Supplier<T> s) {
     return () -> Suppliers.memoizeWithExpiration(s::get, 100, MILLISECONDS).get();
   }
 
+  private ScanServerSelector createScanServerSelector() {
+    String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
+    try {
+      Class<? extends ScanServerSelector> impl =
+          Class.forName(clazz).asSubclass(ScanServerSelector.class);
+      ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();
+
+      Map<String,String> sserverProps = new HashMap<>();
+      ClientProperty
+          .getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
+          .forEach((k, v) -> {
+            sserverProps.put(
+                k.toString()
+                    .substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
+                v.toString());
+          });
+
+      scanServerSelector.init(new ScanServerSelector.InitParameters() {
+        @Override
+        public Map<String,String> getOptions() {
+          return Collections.unmodifiableMap(sserverProps);
+        }
+
+        @Override
+        public ServiceEnvironment getServiceEnv() {
+          return new ClientServiceEnvironmentImpl(ClientContext.this);
+        }
+
+        @Override
+        public Supplier<Collection<ScanServer>> getScanServers() {
+          return () -> ClientContext.this.getScanServers().entrySet().stream()
+              .map(entry -> new ScanServer() {
+                @Override
+                public String getAddress() {
+                  return entry.getKey();
+                }
+
+                @Override
+                public String getGroup() {
+                  return entry.getValue().group;
+                }
+              }).collect(Collectors.toSet());
+        }
+      });
+      return scanServerSelector;
+    } catch (Exception e) {

Review Comment:
   This exception is a bit broad. Could just catch the non-RTEs with a multi-catch, and let the RTEs fall through rather than be caught and re-thrown as an RTE wrapping another RTE.



##########
server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java:
##########
@@ -176,6 +178,23 @@ public void execute(String[] args) throws Exception {
 
       }
 
+      if (opts.zapScanServers) {
+        String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
+        try {
+          List<String> children = zoo.getChildren(sserversPath);
+          for (String child : children) {
+            message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);
+
+            var zLockPath = ServiceLock.path(sserversPath + "/" + child);
+            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
+              ServiceLock.deleteLock(zoo, zLockPath);
+            }
+          }
+        } catch (Exception e) {
+          log.error("{}", e.getMessage(), e);

Review Comment:
   overly broad exception swallowing



##########
core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManagerTest.java:
##########
@@ -27,6 +27,7 @@
 public class BlockCacheManagerTest {
 
   @Test
+  @SuppressWarnings("deprecation")

Review Comment:
   What is being deprecated in this PR? This feature is being added as experimental. New experimental features don't typically cause new deprecations.



##########
server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java:
##########
@@ -266,4 +273,71 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) {
+    try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
+      String prefix = ScanServerFileReferenceSection.getRowPrefix();
+      for (ScanServerRefTabletFile ref : scanRefs) {
+        Mutation m = new Mutation(prefix + ref.getRowSuffix());
+        m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
+        writer.addMutation(m);
+      }
+    } catch (MutationsRejectedException | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
+    try {
+      Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY);
+      scanner.setRange(ScanServerFileReferenceSection.getRange());
+      int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
+              e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);

Review Comment:
   Try to throw more specific RTEs



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java:
##########
@@ -0,0 +1,987 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.SnapshotTablet;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends AbstractServer
+    implements TabletScanClientService.Iface, TabletHostingServer {
+
+  public static class ScanServerOpts extends ServerOpts {
+    @Parameter(required = false, names = {"-g", "--group"},
+        description = "Optional group name that will be made available to the ScanServerSelector client plugin.  If not specified will be set to '"
+            + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME
+            + "'.  Groups support at least two use cases : dedicating resources to scans and/or using different hardware for scans.")
+    private String groupName = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+    public String getGroupName() {
+      return groupName;
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftScanClientHandler delegate;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  // tracks file reservations that are in the process of being added or removed from the metadata
+  // table
+  private final Set<StoredTabletFile> influxFiles = new HashSet<>();
+  // a read lock that ensures files are not removed from reservedFiles while its held
+  private final ReentrantReadWriteLock.ReadLock reservationsReadLock;
+  // a write lock that must be held when mutating influxFiles or when removing entries from
+  // reservedFiles
+  private final ReentrantReadWriteLock.WriteLock reservationsWriteLock;
+  // this condition is used to signal changes to influxFiles
+  private final Condition reservationCondition;
+  // the key is the set of files that have reservations in the metadata table, the value contains
+  // information about which scans are currently using the file
+  private final Map<StoredTabletFile,ReservedFile> reservedFiles = new ConcurrentHashMap<>();
+  private final AtomicLong nextScanReservationId = new AtomicLong();
+
+  private final ServerContext context;
+  private final SessionManager sessionManager;
+  private final TabletServerResourceManager resourceManager;
+  HostAndPort clientAddress;
+  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+
+  private volatile boolean serverStopRequested = false;
+  private ServiceLock scanServerLock;
+  protected TabletServerScanMetrics scanMetrics;
+
+  private ZooCache managerLockCache;
+
+  private final String groupName;
+
+  public ScanServer(ScanServerOpts opts, String[] args) {
+    super("sserver", opts, args);
+
+    context = super.getContext();
+    log.info("Version " + Constants.VERSION);
+    log.info("Instance " + getContext().getInstanceID());
+    this.sessionManager = new SessionManager(context);
+
+    this.resourceManager = new TabletServerResourceManager(context, this);
+
+    this.managerLockCache = new ZooCache(context.getZooReader(), null);
+
+    var readWriteLock = new ReentrantReadWriteLock();
+    reservationsReadLock = readWriteLock.readLock();
+    reservationsWriteLock = readWriteLock.writeLock();
+    reservationCondition = readWriteLock.writeLock().newCondition();
+
+    // Note: The way to control the number of concurrent scans that a ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
+              .scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
+    }
+
+    delegate = newThriftScanClientHandler(new WriteTracker());
+
+    this.groupName = Objects.requireNonNull(opts.getGroupName());
+
+    ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(() -> cleanUpReservedFiles(scanServerReservationExpiration),
+            scanServerReservationExpiration, scanServerReservationExpiration,
+            TimeUnit.MILLISECONDS));
+
+  }
+
+  @VisibleForTesting
+  protected ThriftScanClientHandler newThriftScanClientHandler(WriteTracker writeTracker) {
+    return new ThriftScanClientHandler(this, writeTracker);
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+
+    // This class implements TabletClientService.Iface and then delegates calls. Be sure
+    // to set up the ThriftProcessor using this class, not the delegate.
+    TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, getContext());
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  public String getClientAddressString() {
+    if (clientAddress == null) {
+      return null;
+    }
+    return clientAddress.getHost() + ":" + clientAddress.getPort();
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        // Old zk nodes can be cleaned up by ZooZap
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      serverLockUUID = UUID.randomUUID();
+      scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID);
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      // Don't use the normal ServerServices lock content, instead put the server UUID here.
+      byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8);
+
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (scanServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", scanServerLock.getLockPath());
+          return scanServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
+    }
+    scanMetrics = new TabletServerScanMetrics();
+    MetricsUtil.initializeProducers(scanMetrics);
+
+    // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
+
+    ServiceLock lock = announceExistence();
+
+    try {
+      while (!serverStopRequested) {
+        UtilWaitThread.sleep(1000);
+      }
+    } finally {
+      LOG.info("Stopping Thrift Servers");
+      address.server.stop();
+
+      LOG.info("Removing server scan references");
+      this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
+          serverLockUUID);
+
+      try {
+        LOG.debug("Closing filesystems");
+        VolumeManager mgr = getContext().getVolumeManager();
+        if (null != mgr) {
+          mgr.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != lock) {
+          lock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release scan server lock", e);
+      }
+
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) {
+    if (tabletMetadataCache == null) {
+      return (Map<KeyExtent,TabletMetadata>) tabletMetadataLoader
+          .loadAll((Set<? extends KeyExtent>) extents);
+    } else {
+      return tabletMetadataCache.getAll(extents);
+    }
+  }
+
+  static class ReservedFile {
+    final Set<Long> activeReservations = new ConcurrentSkipListSet<>();
+    final AtomicLong lastUseTime = new AtomicLong(0);
+
+    boolean shouldDelete(long expireTimeMs) {
+      return activeReservations.isEmpty()
+          && System.currentTimeMillis() - lastUseTime.get() > expireTimeMs;
+    }
+  }
+
+  class ScanReservation implements AutoCloseable {
+
+    private final Collection<StoredTabletFile> files;
+    private final long myReservationId;
+    private final Map<KeyExtent,TabletMetadata> tabletsMetadata;
+
+    ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long myReservationId) {
+      this.tabletsMetadata = tabletsMetadata;
+      this.files = tabletsMetadata.values().stream().flatMap(tm -> tm.getFiles().stream())
+          .collect(Collectors.toUnmodifiableSet());
+      this.myReservationId = myReservationId;
+    }
+
+    ScanReservation(Collection<StoredTabletFile> files, long myReservationId) {
+      this.tabletsMetadata = null;
+      this.files = files;
+      this.myReservationId = myReservationId;
+    }
+
+    public TabletMetadata getTabletMetadata(KeyExtent extent) {
+      return tabletsMetadata.get(extent);
+    }
+
+    SnapshotTablet newTablet(ScanServer server, KeyExtent extent) throws IOException {
+      var tabletMetadata = getTabletMetadata(extent);
+      TabletResourceManager trm =
+          resourceManager.createTabletResourceManager(tabletMetadata.getExtent(),
+              context.getTableConfiguration(tabletMetadata.getExtent().tableId()));
+      return new SnapshotTablet(server, tabletMetadata, trm);
+    }
+
+    @Override
+    public void close() {
+      // There is no need to get a lock for removing our reservations. The reservation was added
+      // with a lock held and once its added that prevents file from being removed.
+      for (StoredTabletFile file : files) {
+        var reservedFile = reservedFiles.get(file);
+
+        if (!reservedFile.activeReservations.remove(myReservationId)) {
+          throw new IllegalStateException("reservation id was not in set as expected");
+        }
+
+        LOG.trace("RFFS {} unreserved reference for file {}", myReservationId, file);
+
+        reservedFile.lastUseTime.set(System.currentTimeMillis());
+      }
+    }
+  }
+
+  private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> extents,
+      long myReservationId) throws NotServingTabletException, AccumuloException {
+    // RFS is an acronym for Reference files for scan
+    LOG.debug("RFFS {} ensuring files are referenced for scan of extents {}", myReservationId,
+        extents);
+
+    Map<KeyExtent,TabletMetadata> tabletsMetadata = getTabletMetadata(extents);
+
+    for (KeyExtent extent : extents) {
+      var tabletMetadata = tabletsMetadata.get(extent);
+      if (tabletMetadata == null) {
+        LOG.info("RFFS {} extent not found in metadata table {}", myReservationId, extent);
+        throw new NotServingTabletException(extent.toThrift());
+      }
+
+      if (!AssignmentHandler.checkTabletMetadata(extent, null, tabletMetadata, true)) {
+        LOG.info("RFFS {} extent unable to load {} as AssignmentHandler returned false",
+            myReservationId, extent);
+        throw new NotServingTabletException(extent.toThrift());
+      }
+    }
+
+    Map<StoredTabletFile,KeyExtent> allFiles = new HashMap<>();
+
+    tabletsMetadata.forEach((extent, tm) -> {
+      tm.getFiles().forEach(file -> allFiles.put(file, extent));
+    });
+
+    // The read lock prevents anything from being removed from reservedFiles while adding
+    // reservations to the values of reservedFiles. Using a read lock avoids scans from having to
+    // wait on each other their files are already reserved.
+    reservationsReadLock.lock();
+    try {
+      if (reservedFiles.keySet().containsAll(allFiles.keySet())) {
+        // all files already have reservations in the metadata table, so we can add ourself
+        for (StoredTabletFile file : allFiles.keySet()) {
+          if (!reservedFiles.get(file).activeReservations.add(myReservationId)) {
+            throw new IllegalStateException("reservation id unexpectedly already in set");
+          }
+        }
+
+        return tabletsMetadata;
+      }
+    } finally {
+      reservationsReadLock.unlock();
+    }
+
+    // reservations do not exist in the metadata table, so we will attempt to add them
+    reservationsWriteLock.lock();
+    try {
+      // wait if another thread is working on the files we are interested in
+      while (!Collections.disjoint(influxFiles, allFiles.keySet())) {
+        reservationCondition.await();
+      }
+
+      // add files to reserve to influxFiles so that no other thread tries to add or remove these
+      // file from the metadata table or the reservedFiles map
+      influxFiles.addAll(allFiles.keySet());
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      reservationsWriteLock.unlock();
+    }
+
+    // do not add any code here that could cause an exception which could lead to files not being
+    // removed from influxFiles
+
+    try {
+      Set<StoredTabletFile> filesToReserve = new HashSet<>();
+      List<ScanServerRefTabletFile> refs = new ArrayList<>();
+      Set<KeyExtent> tabletsToCheck = new HashSet<>();
+
+      String serverAddress = clientAddress.toString();
+
+      for (StoredTabletFile file : allFiles.keySet()) {
+        if (!reservedFiles.containsKey(file)) {
+          refs.add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID));
+          filesToReserve.add(file);
+          tabletsToCheck.add(Objects.requireNonNull(allFiles.get(file)));
+          LOG.trace("RFFS {} need to add scan ref for file {}", myReservationId, file);
+        }
+      }
+
+      if (!filesToReserve.isEmpty()) {
+        getContext().getAmple().putScanServerFileReferences(refs);
+
+        // After we insert the scan server refs we need to check and see if the tablet is still
+        // using the file. As long as the tablet is still using the files then the Accumulo GC
+        // should not have deleted the files. This assumes the Accumulo GC reads scan server refs
+        // after tablet refs from the metadata table.
+
+        if (tabletMetadataCache != null) {
+          // lets clear the cache so we get the latest
+          tabletMetadataCache.invalidateAll(tabletsToCheck);
+        }
+
+        var tabletsToCheckMetadata = getTabletMetadata(tabletsToCheck);
+
+        for (KeyExtent extent : tabletsToCheck) {
+          TabletMetadata metadataAfter = tabletsToCheckMetadata.get(extent);
+          if (metadataAfter == null) {
+            getContext().getAmple().deleteScanServerFileReferences(refs);
+            LOG.info("RFFS {} extent unable to load {} as metadata no longer referencing files",
+                myReservationId, extent);
+            throw new NotServingTabletException(extent.toThrift());
+          }
+
+          // remove files that are still referenced
+          filesToReserve.removeAll(metadataAfter.getFiles());
+        }
+
+        // if this is not empty it means some files that we reserved are no longer referenced by
+        // tablets. This means there could have been a time gap where nothing referenced a file
+        // meaning it could have been GCed.
+        if (!filesToReserve.isEmpty()) {
+          LOG.info("RFFS {} tablet files changed while attempting to reference files {}",
+              myReservationId, filesToReserve);
+          getContext().getAmple().deleteScanServerFileReferences(refs);
+          return null;
+        }
+      }
+
+      // we do not hold a lock but the files we are adding are in influxFiles so its ok to add to
+      // reservedFiles
+      for (StoredTabletFile file : allFiles.keySet()) {
+        if (!reservedFiles.computeIfAbsent(file, k -> new ReservedFile()).activeReservations
+            .add(myReservationId)) {
+          throw new IllegalStateException("reservation id unexpectedly already in set");
+        }
+
+        LOG.trace("RFFS {} reserved reference for startScan {}", myReservationId, file);
+      }
+
+      return tabletsMetadata;
+    } finally {
+      reservationsWriteLock.lock();
+      try {
+        allFiles.keySet().forEach(file -> Preconditions.checkState(influxFiles.remove(file)));
+        reservationCondition.signal();
+      } finally {
+        reservationsWriteLock.unlock();
+      }
+    }
+  }
+
+  protected ScanReservation reserveFiles(Collection<KeyExtent> extents)
+      throws NotServingTabletException, AccumuloException {
+
+    long myReservationId = nextScanReservationId.incrementAndGet();
+
+    Map<KeyExtent,TabletMetadata> tabletsMetadata = reserveFilesInner(extents, myReservationId);
+    while (tabletsMetadata == null) {
+      tabletsMetadata = reserveFilesInner(extents, myReservationId);
+    }
+
+    return new ScanReservation(tabletsMetadata, myReservationId);
+  }
+
+  protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException {
+    var session = (ScanSession) sessionManager.getSession(scanId);
+    if (session == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    Set<StoredTabletFile> scanSessionFiles;
+
+    if (session instanceof SingleScanSession) {
+      var sss = (SingleScanSession) session;
+      scanSessionFiles =
+          Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
+    } else if (session instanceof MultiScanSession) {
+      var mss = (MultiScanSession) session;
+      scanSessionFiles = mss.exents.stream()
+          .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
+          .collect(Collectors.toUnmodifiableSet());
+    } else {
+      throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
+    }
+
+    long myReservationId = nextScanReservationId.incrementAndGet();
+    // we are only reserving if the files already exists in reservedFiles, so only need the read
+    // lock which prevents deletions from reservedFiles while we mutate the values of reservedFiles
+    reservationsReadLock.lock();
+    try {
+      if (!reservedFiles.keySet().containsAll(scanSessionFiles)) {
+        // the files are no longer reserved in the metadata table, so lets pretend there is no scan
+        // session
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("RFFS {} files are no longer referenced on continue scan {} {}",
+              myReservationId, scanId, Sets.difference(scanSessionFiles, reservedFiles.keySet()));
+        }
+        throw new NoSuchScanIDException();
+      }
+
+      for (StoredTabletFile file : scanSessionFiles) {
+        if (!reservedFiles.get(file).activeReservations.add(myReservationId)) {
+          throw new IllegalStateException("reservation id unexpectedly already in set");
+        }
+
+        LOG.trace("RFFS {} reserved reference for continue scan {} {}", myReservationId, scanId,
+            file);
+      }
+    } finally {
+      reservationsReadLock.unlock();
+    }
+
+    return new ScanReservation(scanSessionFiles, myReservationId);
+  }
+
+  private void cleanUpReservedFiles(long expireTimeMs) {
+
+    // Do a quick check to see if there is any potential work. This check is done to avoid acquiring
+    // the write lock unless its needed since the write lock can be disruptive for the read lock.
+    if (reservedFiles.values().stream().anyMatch(rf -> rf.shouldDelete(expireTimeMs))) {
+
+      List<ScanServerRefTabletFile> refsToDelete = new ArrayList<>();
+      List<StoredTabletFile> confirmed = new ArrayList<>();
+      String serverAddress = clientAddress.toString();
+
+      reservationsWriteLock.lock();
+      try {
+        var reservedIter = reservedFiles.entrySet().iterator();
+
+        while (reservedIter.hasNext()) {
+          var entry = reservedIter.next();
+          var file = entry.getKey();

Review Comment:
   `.forEach()` might make more sense than an entry set iterator (unless you're avoiding that to allow checked exceptions to get out of the block). Still, in that case, a regular for loop over the entry set would probably be better than getting an iterator and doing a while loop.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java:
##########
@@ -84,7 +85,8 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
         new ScanState(context, tableId, authorizations, new Range(range), options.fetchedColumns,
             size, options.serverSideIteratorList, options.serverSideIteratorOptions, isolated,
             readaheadThreshold, options.getSamplerConfiguration(), options.batchTimeOut,
-            options.classLoaderContext, options.executionHints);
+            options.classLoaderContext, options.executionHints,
+            (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) ? true : false);

Review Comment:
   This ternary operator is redundant... the equals method already returns true or false. Also, this looks like an enum comparison and should use `==` for type safety instead of `.equals`.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java:
##########
@@ -0,0 +1,987 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.SnapshotTablet;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends AbstractServer
+    implements TabletScanClientService.Iface, TabletHostingServer {
+
+  public static class ScanServerOpts extends ServerOpts {
+    @Parameter(required = false, names = {"-g", "--group"},
+        description = "Optional group name that will be made available to the ScanServerSelector client plugin.  If not specified will be set to '"
+            + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME
+            + "'.  Groups support at least two use cases : dedicating resources to scans and/or using different hardware for scans.")
+    private String groupName = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+    public String getGroupName() {
+      return groupName;
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftScanClientHandler delegate;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  // tracks file reservations that are in the process of being added or removed from the metadata
+  // table
+  private final Set<StoredTabletFile> influxFiles = new HashSet<>();
+  // a read lock that ensures files are not removed from reservedFiles while its held
+  private final ReentrantReadWriteLock.ReadLock reservationsReadLock;
+  // a write lock that must be held when mutating influxFiles or when removing entries from
+  // reservedFiles
+  private final ReentrantReadWriteLock.WriteLock reservationsWriteLock;
+  // this condition is used to signal changes to influxFiles
+  private final Condition reservationCondition;
+  // the key is the set of files that have reservations in the metadata table, the value contains
+  // information about which scans are currently using the file
+  private final Map<StoredTabletFile,ReservedFile> reservedFiles = new ConcurrentHashMap<>();
+  private final AtomicLong nextScanReservationId = new AtomicLong();
+
+  private final ServerContext context;
+  private final SessionManager sessionManager;
+  private final TabletServerResourceManager resourceManager;
+  HostAndPort clientAddress;
+  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+
+  private volatile boolean serverStopRequested = false;
+  private ServiceLock scanServerLock;
+  protected TabletServerScanMetrics scanMetrics;
+
+  private ZooCache managerLockCache;
+
+  private final String groupName;
+
+  public ScanServer(ScanServerOpts opts, String[] args) {
+    super("sserver", opts, args);
+
+    context = super.getContext();
+    log.info("Version " + Constants.VERSION);
+    log.info("Instance " + getContext().getInstanceID());
+    this.sessionManager = new SessionManager(context);
+
+    this.resourceManager = new TabletServerResourceManager(context, this);
+
+    this.managerLockCache = new ZooCache(context.getZooReader(), null);
+
+    var readWriteLock = new ReentrantReadWriteLock();
+    reservationsReadLock = readWriteLock.readLock();
+    reservationsWriteLock = readWriteLock.writeLock();
+    reservationCondition = readWriteLock.writeLock().newCondition();
+
+    // Note: The way to control the number of concurrent scans that a ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
+              .scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
+    }
+
+    delegate = newThriftScanClientHandler(new WriteTracker());
+
+    this.groupName = Objects.requireNonNull(opts.getGroupName());
+
+    ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(() -> cleanUpReservedFiles(scanServerReservationExpiration),
+            scanServerReservationExpiration, scanServerReservationExpiration,
+            TimeUnit.MILLISECONDS));
+
+  }
+
+  @VisibleForTesting
+  protected ThriftScanClientHandler newThriftScanClientHandler(WriteTracker writeTracker) {
+    return new ThriftScanClientHandler(this, writeTracker);
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+
+    // This class implements TabletClientService.Iface and then delegates calls. Be sure
+    // to set up the ThriftProcessor using this class, not the delegate.
+    TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, getContext());
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  public String getClientAddressString() {
+    if (clientAddress == null) {
+      return null;
+    }
+    return clientAddress.getHost() + ":" + clientAddress.getPort();
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        // Old zk nodes can be cleaned up by ZooZap
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      serverLockUUID = UUID.randomUUID();
+      scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID);
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      // Don't use the normal ServerServices lock content, instead put the server UUID here.
+      byte[] lockContent = (serverLockUUID.toString() + "," + groupName).getBytes(UTF_8);
+
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (scanServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", scanServerLock.getLockPath());
+          return scanServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);

Review Comment:
   Please avoid swallowing overly broad exceptions. RTEs should fall through.



##########
core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A client side plugin that determines what scan servers to use for eventually consistent scans.
+ * When a scanner sets
+ * {@link org.apache.accumulo.core.client.ScannerBase#setConsistencyLevel(ScannerBase.ConsistencyLevel)}
+ * to {@link org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel#EVENTUAL} then this plugin
+ * is used to determine which scan servers to use for a given tablet. To configure a class to use
+ * for this plugin set its name using the client config {@code scan.server.selector.impl}
+ *
+ * @since 2.1.0
+ */
+public interface ScanServerSelector {
+
+  /**
+   * The scan server group name that will be used when one is not specified.
+   */
+  public static final String DEFAULT_SCAN_SERVER_GROUP_NAME = "default";
+
+  /**
+   * Information about a scan server.
+   *
+   * @since 2.1.0
+   */
+  public interface ScanServer {
+    /**
+     * @return the address in the form of {@code <host>:<port>} where the scan server is running.
+     */
+    String getAddress();
+
+    /**
+     * @return the group name set when the scan server was started. If a group name was not set for
+     *         the scan server, then the string {@value #DEFAULT_SCAN_SERVER_GROUP_NAME} is
+     *         returned.
+     */
+    String getGroup();
+  }
+
+  /**
+   * This interface exists so that is easier to evolve what is passed to
+   * {@link #init(InitParameters)} without having to make breaking changes.
+   *
+   * @since 2.1.0
+   */
+  public interface InitParameters {

Review Comment:
   There are a lot of nested interfaces inside this class. These should probably be separate files in the same package. Or, if they aren't required to be implemented by the user's SPI implementation, then they could be removed outside of the spi package to reduce user confusion (for example, if they are objects we create and pass to the user's plugin code).



##########
core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java:
##########
@@ -76,7 +76,9 @@ public void testConfiguration() {
     // this is not set for the cache type, so should fall back to default
     cc.set(defaultPrefix + LruBlockCacheConfiguration.MAP_LOAD_PROPERTY, "0.53");
 
-    BlockCacheConfiguration bcc = new BlockCacheConfiguration(cc);
+    BlockCacheConfiguration bcc = new BlockCacheConfiguration(cc, Property.TSERV_PREFIX,
+        Property.TSERV_INDEXCACHE_SIZE, Property.TSERV_DATACACHE_SIZE,
+        Property.TSERV_SUMMARYCACHE_SIZE, Property.TSERV_DEFAULT_BLOCKSIZE);

Review Comment:
   This pattern keeps getting repeated. It's pretty verbose boilerplate to construct the block cache configuration now. It'd probably be easier to pass a flag/enum to specify whether TSERV or SSERV props should be used, and then resolve that logic inside the constructor.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java:
##########
@@ -136,6 +136,11 @@ public List<String> getManagerLocations() {
     return context.getManagerLocations();
   }
 
+  @Override
+  public List<String> getScanServers() {
+    return List.copyOf(context.getScanServers().keySet());
+  }

Review Comment:
   Why is this returning a list instead of a Set?



##########
core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java:
##########
@@ -121,12 +131,21 @@ public void testShellOutput() throws Exception {
     ClusterConfigParser.outputShellVariables(contents, ps);
     ps.close();
 
-    Map<String,
-        String> expected = Map.of("MANAGER_HOSTS", "\"localhost1 localhost2\"", "MONITOR_HOSTS",
-            "\"localhost1 localhost2\"", "GC_HOSTS", "\"localhost\"", "TSERVER_HOSTS",
-            "\"localhost1 localhost2 localhost3 localhost4\"", "COORDINATOR_HOSTS",
-            "\"localhost1 localhost2\"", "COMPACTION_QUEUES", "\"q1 q2\"", "COMPACTOR_HOSTS_q1",
-            "\"localhost1 localhost2\"", "COMPACTOR_HOSTS_q2", "\"localhost3 localhost4\"");
+    Map<String,String> expected = new HashMap<>();
+    expected.put("MANAGER_HOSTS", "localhost1 localhost2");
+    expected.put("MONITOR_HOSTS", "localhost1 localhost2");
+    expected.put("GC_HOSTS", "localhost");
+    expected.put("TSERVER_HOSTS", "localhost1 localhost2 localhost3 localhost4");
+    expected.put("COORDINATOR_HOSTS", "localhost1 localhost2");
+    expected.put("COMPACTION_QUEUES", "q1 q2");
+    expected.put("COMPACTOR_HOSTS_q1", "localhost1 localhost2");
+    expected.put("COMPACTOR_HOSTS_q2", "localhost3 localhost4");
+    expected.put("SSERVER_GROUPS", "default highmem cheap");
+    expected.put("SSERVER_HOSTS_default", "localhost1 localhost2");
+    expected.put("SSERVER_HOSTS_highmem", "hmvm1 hmvm2 hmvm3");
+    expected.put("SSERVER_HOSTS_cheap", "burstyvm1 burstyvm2");
+
+    expected = Maps.transformValues(expected, v -> '"' + v + '"');

Review Comment:
   Should use built-in stream to transform maps, rather than rely on Guava.



##########
core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java:
##########
@@ -373,6 +384,23 @@ default void forEach(BiConsumer<? super Key,? super Value> keyValueConsumer) {
     }
   }
 
+  /**
+   * Get the configured consistency level
+   *
+   * @return consistency level
+   * @since 2.1.0
+   */
+  public ConsistencyLevel getConsistencyLevel();
+
+  /**
+   * Set the desired consistency level for this scanner.
+   *
+   * @param level
+   *          consistency level
+   * @since 2.1.0
+   */
+  public void setConsistencyLevel(ConsistencyLevel level);
+

Review Comment:
   I'm still not comfortable with this terminology in the API, but I don't have an alternative at this time, so I'll consider this settled for now.



##########
core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metadata;
+
+import java.util.UUID;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class ScanServerRefTabletFile extends TabletFile {
+
+  private final Value NULL_VALUE = new Value(new byte[0]);
+  private final Text colf;
+  private final Text colq;
+
+  public ScanServerRefTabletFile(String file, String serverAddress, UUID serverLockUUID) {
+    super(new Path(file));
+    this.colf = new Text(serverAddress);
+    this.colq = new Text(serverLockUUID.toString());
+  }
+
+  public ScanServerRefTabletFile(String file, Text colf, Text colq) {
+    super(new Path(file));
+    this.colf = colf;
+    this.colq = colq;
+  }
+
+  public String getRowSuffix() {
+    return this.getPathStr();
+  }
+
+  public Text getServerAddress() {
+    return this.colf;
+  }
+
+  public Text getServerLockUUID() {
+    return this.colq;
+  }
+
+  public Value getValue() {
+    return NULL_VALUE;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = super.hashCode();
+    result = prime * result + ((colf == null) ? 0 : colf.hashCode());
+    result = prime * result + ((colq == null) ? 0 : colq.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (!super.equals(obj))
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
+    if (colf == null) {

Review Comment:
   Or just `return Objects.equals(field1, other.field1) && Objects.equals(field2, other.field2) && ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org