You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2023/02/08 17:47:28 UTC

[accumulo] branch 2.1 updated (efc882a8c5 -> 895479eb08)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a change to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


    from efc882a8c5 Switch log level to trace in SystemPropUtil (#3184)
     add b37b0626b6 fix parameter check and minor variable rename (#3176)
     new 895479eb08 Merge branch '1.10' into 2.1

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/client/ClientSideIteratorScanner.java     |  8 +++----
 .../accumulo/core/client/IsolatedScanner.java      |  8 +++----
 .../accumulo/core/clientImpl/ScannerIterator.java  |  2 +-
 .../accumulo/core/clientImpl/ScannerOptions.java   | 26 +++++++++++-----------
 .../core/clientImpl/TabletServerBatchReader.java   |  2 +-
 .../TabletServerBatchReaderIterator.java           | 12 +++++-----
 6 files changed, 29 insertions(+), 29 deletions(-)


[accumulo] 01/01: Merge branch '1.10' into 2.1

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 895479eb08074b70df9ec71046ba6a75c099d360
Merge: efc882a8c5 b37b0626b6
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Wed Feb 8 12:45:19 2023 -0500

    Merge branch '1.10' into 2.1

 .../core/client/ClientSideIteratorScanner.java     |  8 +++----
 .../accumulo/core/client/IsolatedScanner.java      |  8 +++----
 .../accumulo/core/clientImpl/ScannerIterator.java  |  2 +-
 .../accumulo/core/clientImpl/ScannerOptions.java   | 26 +++++++++++-----------
 .../core/clientImpl/TabletServerBatchReader.java   |  2 +-
 .../TabletServerBatchReaderIterator.java           | 12 +++++-----
 6 files changed, 29 insertions(+), 29 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index 908d789588,556bc431f1..542273d470
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@@ -213,13 -239,12 +213,13 @@@ public class ClientSideIteratorScanner 
      smi = new ScannerTranslatorImpl(scanner, scanner.getSamplerConfiguration());
      this.range = scanner.getRange();
      this.size = scanner.getBatchSize();
-     this.timeOut = scanner.getTimeout(MILLISECONDS);
-     this.batchTimeOut = scanner.getTimeout(MILLISECONDS);
 -    this.retryTimeout = scanner.getTimeout(TimeUnit.MILLISECONDS);
 -    this.batchTimeout = scanner.getTimeout(TimeUnit.MILLISECONDS);
++    this.retryTimeout = scanner.getTimeout(MILLISECONDS);
++    this.batchTimeout = scanner.getTimeout(MILLISECONDS);
      this.readaheadThreshold = scanner.getReadaheadThreshold();
      SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration();
 -    if (samplerConfig != null)
 +    if (samplerConfig != null) {
        setSamplerConfiguration(samplerConfig);
 +    }
    }
  
    /**
@@@ -232,14 -257,13 +232,14 @@@
    @Override
    public Iterator<Entry<Key,Value>> iterator() {
      smi.scanner.setBatchSize(size);
-     smi.scanner.setTimeout(timeOut, MILLISECONDS);
-     smi.scanner.setBatchTimeout(batchTimeOut, MILLISECONDS);
 -    smi.scanner.setTimeout(retryTimeout, TimeUnit.MILLISECONDS);
 -    smi.scanner.setBatchTimeout(batchTimeout, TimeUnit.MILLISECONDS);
++    smi.scanner.setTimeout(retryTimeout, MILLISECONDS);
++    smi.scanner.setBatchTimeout(batchTimeout, MILLISECONDS);
      smi.scanner.setReadaheadThreshold(readaheadThreshold);
 -    if (isolated)
 +    if (isolated) {
        smi.scanner.enableIsolation();
 -    else
 +    } else {
        smi.scanner.disableIsolation();
 +    }
  
      smi.samplerConfig = getSamplerConfiguration();
  
diff --cc core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index dcbb190dd3,6cd97de04f..518737b02d
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@@ -227,8 -228,8 +227,8 @@@ public class IsolatedScanner extends Sc
    public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
      this.scanner = scanner;
      this.range = scanner.getRange();
-     this.timeOut = scanner.getTimeout(MILLISECONDS);
-     this.batchTimeOut = scanner.getBatchTimeout(MILLISECONDS);
 -    this.retryTimeout = scanner.getTimeout(TimeUnit.MILLISECONDS);
 -    this.batchTimeout = scanner.getBatchTimeout(TimeUnit.MILLISECONDS);
++    this.retryTimeout = scanner.getTimeout(MILLISECONDS);
++    this.batchTimeout = scanner.getBatchTimeout(MILLISECONDS);
      this.batchSize = scanner.getBatchSize();
      this.readaheadThreshold = scanner.getReadaheadThreshold();
      this.bufferFactory = bufferFactory;
@@@ -236,10 -237,28 +236,10 @@@
  
    @Override
    public Iterator<Entry<Key,Value>> iterator() {
-     return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, readaheadThreshold,
-         bufferFactory);
+     return new RowBufferingIterator(scanner, this, range, retryTimeout, batchSize,
+         readaheadThreshold, bufferFactory);
    }
  
 -  @Deprecated
 -  @Override
 -  public void setTimeOut(int timeOut) {
 -    if (timeOut == Integer.MAX_VALUE)
 -      setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
 -    else
 -      setTimeout(timeOut, TimeUnit.SECONDS);
 -  }
 -
 -  @Deprecated
 -  @Override
 -  public int getTimeOut() {
 -    long timeout = getTimeout(TimeUnit.SECONDS);
 -    if (timeout >= Integer.MAX_VALUE)
 -      return Integer.MAX_VALUE;
 -    return (int) timeout;
 -  }
 -
    @Override
    public void setRange(Range range) {
      this.range = range;
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index c098440027,0000000000..182b912c55
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@@ -1,219 -1,0 +1,219 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.client.SampleNotPresentException;
 +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanState;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyValue;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +public class ScannerIterator implements Iterator<Entry<Key,Value>> {
 +
 +  // scanner options
 +  private long timeOut;
 +
 +  // scanner state
 +  private Iterator<KeyValue> iter;
 +  private final ScanState scanState;
 +
 +  private ScannerOptions options;
 +
 +  private Future<List<KeyValue>> readAheadOperation;
 +
 +  private boolean finished = false;
 +
 +  private long batchCount = 0;
 +  private long readaheadThreshold;
 +
 +  private ScannerImpl.Reporter reporter;
 +
 +  private final ClientContext context;
 +
 +  private AtomicBoolean closed = new AtomicBoolean(false);
 +
 +  ScannerIterator(ClientContext context, TableId tableId, Authorizations authorizations,
 +      Range range, int size, long timeOut, ScannerOptions options, boolean isolated,
 +      long readaheadThreshold, ScannerImpl.Reporter reporter) {
 +    this.context = context;
 +    this.timeOut = timeOut;
 +    this.readaheadThreshold = readaheadThreshold;
 +
 +    this.options = new ScannerOptions(options);
 +
 +    this.reporter = reporter;
 +
 +    if (!this.options.fetchedColumns.isEmpty()) {
 +      range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
 +    }
 +
 +    scanState = new ScanState(context, tableId, authorizations, new Range(range),
 +        options.fetchedColumns, size, options.serverSideIteratorList,
 +        options.serverSideIteratorOptions, isolated, readaheadThreshold,
-         options.getSamplerConfiguration(), options.batchTimeOut, options.classLoaderContext,
++        options.getSamplerConfiguration(), options.batchTimeout, options.classLoaderContext,
 +        options.executionHints, options.getConsistencyLevel() == ConsistencyLevel.EVENTUAL);
 +
 +    // If we want to start readahead immediately, don't wait for hasNext to be called
 +    if (readaheadThreshold == 0L) {
 +      initiateReadAhead();
 +    }
 +    iter = null;
 +  }
 +
 +  @Override
 +  public boolean hasNext() {
 +    if (finished) {
 +      return false;
 +    }
 +
 +    if (iter != null && iter.hasNext()) {
 +      return true;
 +    }
 +
 +    iter = getNextBatch().iterator();
 +    if (!iter.hasNext()) {
 +      finished = true;
 +      reporter.finished(this);
 +      return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  @Override
 +  public Entry<Key,Value> next() {
 +    if (hasNext()) {
 +      return iter.next();
 +    }
 +    throw new NoSuchElementException();
 +  }
 +
 +  void close() {
 +    // run actual close operation in the background so this does not block.
 +    context.executeCleanupTask(() -> {
 +      synchronized (scanState) {
 +        // this is synchronized so its mutually exclusive with readBatch()
 +        try {
 +          closed.set(true);
 +          ThriftScanner.close(scanState);
 +        } catch (Exception e) {
 +          LoggerFactory.getLogger(ScannerIterator.class)
 +              .debug("Exception when closing scan session", e);
 +        }
 +      }
 +    });
 +  }
 +
 +  private void initiateReadAhead() {
 +    Preconditions.checkState(readAheadOperation == null);
 +    readAheadOperation = context.submitScannerReadAheadTask(this::readBatch);
 +  }
 +
 +  private List<KeyValue> readBatch() throws Exception {
 +
 +    List<KeyValue> batch;
 +
 +    do {
 +      synchronized (scanState) {
 +        // this is synchronized so its mutually exclusive with closing
 +        Preconditions.checkState(!closed.get(), "Scanner was closed");
 +        batch = ThriftScanner.scan(scanState.context, scanState, timeOut);
 +      }
 +    } while (batch != null && batch.isEmpty());
 +
 +    if (batch != null) {
 +      reporter.readBatch(this);
 +    }
 +
 +    return batch == null ? Collections.emptyList() : batch;
 +  }
 +
 +  private List<KeyValue> getNextBatch() {
 +
 +    List<KeyValue> nextBatch;
 +
 +    try {
 +      if (readAheadOperation == null) {
 +        // no read ahead run, fetch the next batch right now
 +        nextBatch = readBatch();
 +      } else {
 +        nextBatch = readAheadOperation.get();
 +        readAheadOperation = null;
 +      }
 +    } catch (ExecutionException ee) {
 +      wrapExecutionException(ee);
 +      throw new RuntimeException(ee);
 +    } catch (RuntimeException e) {
 +      throw e;
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    if (!nextBatch.isEmpty()) {
 +      batchCount++;
 +
 +      if (batchCount > readaheadThreshold) {
 +        // start a thread to read the next batch
 +        initiateReadAhead();
 +      }
 +    }
 +
 +    return nextBatch;
 +  }
 +
 +  private void wrapExecutionException(ExecutionException ee) {
 +    // Need preserve the type of exception that was the cause because some code depends on it.
 +    // However the cause is an exception that occurred in a background thread, so throwing it would
 +    // lose the stack trace for the user thread calling the scanner. Wrapping the exception with the
 +    // same type preserves the type and stack traces (foreground and background thread traces) that
 +    // are critical for debugging.
 +    if (ee.getCause() instanceof IsolationException) {
 +      throw new IsolationException(ee);
 +    }
 +    if (ee.getCause() instanceof TableDeletedException) {
 +      TableDeletedException cause = (TableDeletedException) ee.getCause();
 +      throw new TableDeletedException(cause.getTableId(), cause);
 +    }
 +    if (ee.getCause() instanceof TableOfflineException) {
 +      throw new TableOfflineException(ee);
 +    }
 +    if (ee.getCause() instanceof SampleNotPresentException) {
 +      throw new SampleNotPresentException(ee.getCause().getMessage(), ee);
 +    }
 +  }
 +
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java
index 3f191a2f49,0000000000..2b044f6fd8
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java
@@@ -1,290 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static java.util.Objects.requireNonNull;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.ScannerBase;
 +import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.io.Text;
 +
 +public class ScannerOptions implements ScannerBase {
 +
 +  protected List<IterInfo> serverSideIteratorList = Collections.emptyList();
 +  protected Map<String,Map<String,String>> serverSideIteratorOptions = Collections.emptyMap();
 +
 +  protected SortedSet<Column> fetchedColumns = new TreeSet<>();
 +
-   protected long timeOut = Long.MAX_VALUE;
++  protected long retryTimeout = Long.MAX_VALUE;
 +
-   protected long batchTimeOut = Long.MAX_VALUE;
++  protected long batchTimeout = Long.MAX_VALUE;
 +
 +  private String regexIterName = null;
 +
 +  private SamplerConfiguration samplerConfig = null;
 +
 +  protected String classLoaderContext = null;
 +
 +  protected Map<String,String> executionHints = Collections.emptyMap();
 +
 +  private ConsistencyLevel consistencyLevel = ConsistencyLevel.IMMEDIATE;
 +
 +  protected ScannerOptions() {}
 +
 +  public ScannerOptions(ScannerOptions so) {
 +    setOptions(this, so);
 +  }
 +
 +  @Override
 +  public synchronized void addScanIterator(IteratorSetting si) {
 +    checkArgument(si != null, "si is null");
 +    if (serverSideIteratorList.isEmpty()) {
 +      serverSideIteratorList = new ArrayList<>();
 +    }
 +
 +    for (IterInfo ii : serverSideIteratorList) {
 +      if (ii.iterName.equals(si.getName())) {
 +        throw new IllegalArgumentException("Iterator name is already in use " + si.getName());
 +      }
 +      if (ii.getPriority() == si.getPriority()) {
 +        throw new IllegalArgumentException(
 +            "Iterator priority is already in use " + si.getPriority());
 +      }
 +    }
 +
 +    serverSideIteratorList.add(new IterInfo(si.getPriority(), si.getIteratorClass(), si.getName()));
 +
 +    if (serverSideIteratorOptions.isEmpty()) {
 +      serverSideIteratorOptions = new HashMap<>();
 +    }
 +    serverSideIteratorOptions.computeIfAbsent(si.getName(), k -> new HashMap<>())
 +        .putAll(si.getOptions());
 +  }
 +
 +  @Override
 +  public synchronized void removeScanIterator(String iteratorName) {
 +    checkArgument(iteratorName != null, "iteratorName is null");
 +    // if no iterators are set, we don't have it, so it is already removed
 +    if (serverSideIteratorList.isEmpty()) {
 +      return;
 +    }
 +
 +    for (IterInfo ii : serverSideIteratorList) {
 +      if (ii.iterName.equals(iteratorName)) {
 +        serverSideIteratorList.remove(ii);
 +        break;
 +      }
 +    }
 +
 +    serverSideIteratorOptions.remove(iteratorName);
 +  }
 +
 +  @Override
 +  public synchronized void updateScanIteratorOption(String iteratorName, String key, String value) {
 +    checkArgument(iteratorName != null, "iteratorName is null");
 +    checkArgument(key != null, "key is null");
 +    checkArgument(value != null, "value is null");
 +    if (serverSideIteratorOptions.isEmpty()) {
 +      serverSideIteratorOptions = new HashMap<>();
 +    }
 +    serverSideIteratorOptions.computeIfAbsent(iteratorName, k -> new HashMap<>()).put(key, value);
 +  }
 +
 +  @Override
 +  public synchronized void fetchColumnFamily(Text col) {
 +    checkArgument(col != null, "col is null");
 +    Column c = new Column(TextUtil.getBytes(col), null, null);
 +    fetchedColumns.add(c);
 +  }
 +
 +  @Override
 +  public synchronized void fetchColumn(Text colFam, Text colQual) {
 +    checkArgument(colFam != null, "colFam is null");
 +    checkArgument(colQual != null, "colQual is null");
 +    Column c = new Column(TextUtil.getBytes(colFam), TextUtil.getBytes(colQual), null);
 +    fetchedColumns.add(c);
 +  }
 +
 +  @Override
 +  public void fetchColumn(IteratorSetting.Column column) {
 +    checkArgument(column != null, "Column is null");
 +    fetchColumn(column.getColumnFamily(), column.getColumnQualifier());
 +  }
 +
 +  @Override
 +  public synchronized void clearColumns() {
 +    fetchedColumns.clear();
 +  }
 +
 +  public synchronized SortedSet<Column> getFetchedColumns() {
 +    return fetchedColumns;
 +  }
 +
 +  @Override
 +  public synchronized void clearScanIterators() {
 +    serverSideIteratorList = Collections.emptyList();
 +    serverSideIteratorOptions = Collections.emptyMap();
 +    regexIterName = null;
 +  }
 +
 +  protected static void setOptions(ScannerOptions dst, ScannerOptions src) {
 +    synchronized (dst) {
 +      synchronized (src) {
 +        dst.regexIterName = src.regexIterName;
 +        dst.fetchedColumns = new TreeSet<>(src.fetchedColumns);
 +        dst.serverSideIteratorList = new ArrayList<>(src.serverSideIteratorList);
 +        dst.classLoaderContext = src.classLoaderContext;
 +
 +        dst.serverSideIteratorOptions = new HashMap<>();
 +        Set<Entry<String,Map<String,String>>> es = src.serverSideIteratorOptions.entrySet();
 +        for (Entry<String,Map<String,String>> entry : es) {
 +          dst.serverSideIteratorOptions.put(entry.getKey(), new HashMap<>(entry.getValue()));
 +        }
 +
 +        dst.samplerConfig = src.samplerConfig;
-         dst.batchTimeOut = src.batchTimeOut;
++        dst.batchTimeout = src.batchTimeout;
 +
 +        // its an immutable map, so can avoid copy here
 +        dst.executionHints = src.executionHints;
 +
 +        dst.consistencyLevel = src.consistencyLevel;
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public Iterator<Entry<Key,Value>> iterator() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public synchronized void setTimeout(long timeout, TimeUnit timeUnit) {
-     if (timeOut < 0) {
-       throw new IllegalArgumentException("TimeOut must be positive : " + timeOut);
++    if (timeout < 0) {
++      throw new IllegalArgumentException("retry timeout must be positive : " + timeout);
 +    }
 +
 +    if (timeout == 0) {
-       this.timeOut = Long.MAX_VALUE;
++      this.retryTimeout = Long.MAX_VALUE;
 +    } else {
-       this.timeOut = timeUnit.toMillis(timeout);
++      this.retryTimeout = timeUnit.toMillis(timeout);
 +    }
 +  }
 +
 +  @Override
 +  public synchronized long getTimeout(TimeUnit timeunit) {
-     return timeunit.convert(timeOut, MILLISECONDS);
++    return timeunit.convert(retryTimeout, MILLISECONDS);
 +  }
 +
 +  @Override
 +  public void close() {
 +    // Nothing needs to be closed
 +  }
 +
 +  @Override
 +  public Authorizations getAuthorizations() {
 +    throw new UnsupportedOperationException("No authorizations to return");
 +  }
 +
 +  @Override
 +  public synchronized void setSamplerConfiguration(SamplerConfiguration samplerConfig) {
 +    requireNonNull(samplerConfig);
 +    this.samplerConfig = samplerConfig;
 +  }
 +
 +  @Override
 +  public synchronized SamplerConfiguration getSamplerConfiguration() {
 +    return samplerConfig;
 +  }
 +
 +  @Override
 +  public synchronized void clearSamplerConfiguration() {
 +    this.samplerConfig = null;
 +  }
 +
 +  @Override
 +  public void setBatchTimeout(long timeout, TimeUnit timeUnit) {
-     if (timeOut < 0) {
-       throw new IllegalArgumentException("Batch timeout must be positive : " + timeOut);
++    if (timeout < 0) {
++      throw new IllegalArgumentException("Batch timeout must be positive : " + timeout);
 +    }
 +    if (timeout == 0) {
-       this.batchTimeOut = Long.MAX_VALUE;
++      this.batchTimeout = Long.MAX_VALUE;
 +    } else {
-       this.batchTimeOut = timeUnit.toMillis(timeout);
++      this.batchTimeout = timeUnit.toMillis(timeout);
 +    }
 +  }
 +
 +  @Override
 +  public long getBatchTimeout(TimeUnit timeUnit) {
-     return timeUnit.convert(batchTimeOut, MILLISECONDS);
++    return timeUnit.convert(batchTimeout, MILLISECONDS);
 +  }
 +
 +  @Override
 +  public void setClassLoaderContext(String classLoaderContext) {
 +    requireNonNull(classLoaderContext, "classloader context name cannot be null");
 +    this.classLoaderContext = classLoaderContext;
 +  }
 +
 +  @Override
 +  public void clearClassLoaderContext() {
 +    this.classLoaderContext = null;
 +  }
 +
 +  @Override
 +  public String getClassLoaderContext() {
 +    return this.classLoaderContext;
 +  }
 +
 +  @Override
 +  public synchronized void setExecutionHints(Map<String,String> hints) {
 +    this.executionHints = Map.copyOf(Objects.requireNonNull(hints));
 +  }
 +
 +  @Override
 +  public ConsistencyLevel getConsistencyLevel() {
 +    return consistencyLevel;
 +  }
 +
 +  @Override
 +  public void setConsistencyLevel(ConsistencyLevel level) {
 +    this.consistencyLevel = Objects.requireNonNull(level);
 +  }
 +
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index c0de759516,0000000000..5c2f6229e8
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@@ -1,121 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +
 +import java.lang.ref.Cleaner.Cleanable;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.cleaner.CleanerUtil;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
 +  private static final Logger log = LoggerFactory.getLogger(TabletServerBatchReader.class);
 +  private static final AtomicInteger nextBatchReaderInstance = new AtomicInteger(1);
 +
 +  private final int batchReaderInstance = nextBatchReaderInstance.getAndIncrement();
 +  private final TableId tableId;
 +  private final String tableName;
 +  private final int numThreads;
 +  private final ThreadPoolExecutor queryThreadPool;
 +  private final ClientContext context;
 +  private final Authorizations authorizations;
 +  private final AtomicBoolean closed = new AtomicBoolean(false);
 +  private final Cleanable cleanable;
 +
 +  private ArrayList<Range> ranges = null;
 +
 +  public TabletServerBatchReader(ClientContext context, TableId tableId, String tableName,
 +      Authorizations authorizations, int numQueryThreads) {
 +    this(context, BatchScanner.class, tableId, tableName, authorizations, numQueryThreads);
 +  }
 +
 +  protected TabletServerBatchReader(ClientContext context, Class<?> scopeClass, TableId tableId,
 +      String tableName, Authorizations authorizations, int numQueryThreads) {
 +    checkArgument(context != null, "context is null");
 +    checkArgument(tableId != null, "tableId is null");
 +    checkArgument(authorizations != null, "authorizations is null");
 +    this.context = context;
 +    this.authorizations = authorizations;
 +    this.tableId = tableId;
 +    this.tableName = tableName;
 +    this.numThreads = numQueryThreads;
 +
 +    queryThreadPool = context.threadPools().createFixedThreadPool(numQueryThreads,
 +        "batch scanner " + batchReaderInstance + "-", false);
 +    // Call shutdown on this thread pool in case the caller does not call close().
 +    cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log);
 +  }
 +
 +  @Override
 +  public void close() {
 +    if (closed.compareAndSet(false, true)) {
 +      // Shutdown the pool
 +      queryThreadPool.shutdownNow();
 +      // deregister the cleaner, will not call shutdownNow() because closed is now true
 +      cleanable.clean();
 +    }
 +  }
 +
 +  @Override
 +  public Authorizations getAuthorizations() {
 +    return authorizations;
 +  }
 +
 +  @Override
 +  public void setRanges(Collection<Range> ranges) {
 +    if (ranges == null || ranges.isEmpty()) {
 +      throw new IllegalArgumentException("ranges must be non null and contain at least 1 range");
 +    }
 +
 +    if (closed.get()) {
 +      throw new IllegalStateException("batch reader closed");
 +    }
 +
 +    this.ranges = new ArrayList<>(ranges);
 +  }
 +
 +  @Override
 +  public Iterator<Entry<Key,Value>> iterator() {
 +    if (ranges == null) {
 +      throw new IllegalStateException("ranges not set");
 +    }
 +
 +    if (closed.get()) {
 +      throw new IllegalStateException("batch reader closed");
 +    }
 +
 +    return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges,
-         numThreads, queryThreadPool, this, timeOut);
++        numThreads, queryThreadPool, this, retryTimeout);
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index b2cf5460a6,0000000000..890e8f853d
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@@ -1,943 -1,0 +1,943 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +
 +import java.io.IOException;
 +import java.time.Duration;
 +import java.util.AbstractMap.SimpleImmutableEntry;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.ListIterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.Set;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Semaphore;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.SampleNotPresentException;
 +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.TabletId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 +import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
 +import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
 +import org.apache.accumulo.core.dataImpl.thrift.TRange;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
 +import org.apache.accumulo.core.spi.scan.ScanServerSelections;
 +import org.apache.accumulo.core.spi.scan.ScanServerSelector;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException;
 +import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 +import org.apache.accumulo.core.trace.TraceUtil;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.HostAndPort;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
 +
 +  private static final Logger log = LoggerFactory.getLogger(TabletServerBatchReaderIterator.class);
 +
 +  private final ClientContext context;
 +  private final TableId tableId;
 +  private final String tableName;
 +  private Authorizations authorizations = Authorizations.EMPTY;
 +  private final int numThreads;
 +  private final ExecutorService queryThreadPool;
 +  private final ScannerOptions options;
 +
 +  private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
 +  private Iterator<Entry<Key,Value>> batchIterator;
 +  private List<Entry<Key,Value>> batch;
 +  private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<>();
 +  private final Object nextLock = new Object();
 +
 +  private long failSleepTime = 100;
 +
 +  private volatile Throwable fatalException = null;
 +
 +  private Map<String,TimeoutTracker> timeoutTrackers;
 +  private Set<String> timedoutServers;
-   private final long timeout;
++  private final long retryTimeout;
 +
 +  private TabletLocator locator;
 +
 +  private ScanServerAttemptsImpl scanAttempts = new ScanServerAttemptsImpl();
 +
 +  public interface ResultReceiver {
 +    void receive(List<Entry<Key,Value>> entries);
 +  }
 +
 +  public TabletServerBatchReaderIterator(ClientContext context, TableId tableId, String tableName,
 +      Authorizations authorizations, ArrayList<Range> ranges, int numThreads,
-       ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
++      ExecutorService queryThreadPool, ScannerOptions scannerOptions, long retryTimeout) {
 +
 +    this.context = context;
 +    this.tableId = tableId;
 +    this.tableName = tableName;
 +    this.authorizations = authorizations;
 +    this.numThreads = numThreads;
 +    this.queryThreadPool = queryThreadPool;
 +    this.options = new ScannerOptions(scannerOptions);
 +    resultsQueue = new ArrayBlockingQueue<>(numThreads);
 +
-     this.locator = new TimeoutTabletLocator(timeout, context, tableId);
++    this.locator = new TimeoutTabletLocator(retryTimeout, context, tableId);
 +
 +    timeoutTrackers = Collections.synchronizedMap(new HashMap<>());
 +    timedoutServers = Collections.synchronizedSet(new HashSet<>());
-     this.timeout = timeout;
++    this.retryTimeout = retryTimeout;
 +
 +    if (!options.fetchedColumns.isEmpty()) {
 +      ArrayList<Range> ranges2 = new ArrayList<>(ranges.size());
 +      for (Range range : ranges) {
 +        ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
 +      }
 +
 +      ranges = ranges2;
 +    }
 +
 +    ResultReceiver rr = entries -> {
 +      try {
 +        resultsQueue.put(entries);
 +      } catch (InterruptedException e) {
 +        if (TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) {
 +          log.debug("Failed to add Batch Scan result", e);
 +        } else {
 +          log.warn("Failed to add Batch Scan result", e);
 +        }
 +        fatalException = e;
 +        throw new RuntimeException(e);
 +
 +      }
 +    };
 +
 +    try {
 +      lookup(ranges, rr);
 +    } catch (RuntimeException re) {
 +      throw re;
 +    } catch (Exception e) {
 +      throw new RuntimeException("Failed to create iterator", e);
 +    }
 +  }
 +
 +  @Override
 +  public boolean hasNext() {
 +    synchronized (nextLock) {
 +      if (batch == LAST_BATCH) {
 +        return false;
 +      }
 +
 +      if (batch != null && batchIterator.hasNext()) {
 +        return true;
 +      }
 +
 +      // don't have one cached, try to cache one and return success
 +      try {
 +        batch = null;
 +        while (batch == null && fatalException == null && !queryThreadPool.isShutdown()) {
 +          batch = resultsQueue.poll(1, SECONDS);
 +        }
 +
 +        if (fatalException != null) {
 +          if (fatalException instanceof RuntimeException) {
 +            throw (RuntimeException) fatalException;
 +          } else {
 +            throw new RuntimeException(fatalException);
 +          }
 +        }
 +
 +        if (queryThreadPool.isShutdown()) {
 +          String shortMsg =
 +              "The BatchScanner was unexpectedly closed while this Iterator was still in use.";
 +          log.error("{} Ensure that a reference to the BatchScanner is retained"
 +              + " so that it can be closed when this Iterator is exhausted. Not"
 +              + " retaining a reference to the BatchScanner guarantees that you are"
 +              + " leaking threads in your client JVM.", shortMsg);
 +          throw new RuntimeException(shortMsg + " Ensure proper handling of the BatchScanner.");
 +        }
 +
 +        batchIterator = batch.iterator();
 +        return batch != LAST_BATCH;
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public Entry<Key,Value> next() {
 +    // if there's one waiting, or hasNext() can get one, return it
 +    synchronized (nextLock) {
 +      if (hasNext()) {
 +        return batchIterator.next();
 +      } else {
 +        throw new NoSuchElementException();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void remove() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  private synchronized void lookup(List<Range> ranges, ResultReceiver receiver)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    List<Column> columns = new ArrayList<>(options.fetchedColumns);
 +    ranges = Range.mergeOverlapping(ranges);
 +
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 +
 +    binRanges(locator, ranges, binnedRanges);
 +
 +    doLookups(binnedRanges, receiver, columns);
 +  }
 +
 +  private void binRanges(TabletLocator tabletLocator, List<Range> ranges,
 +      Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +
 +    int lastFailureSize = Integer.MAX_VALUE;
 +
 +    while (true) {
 +
 +      binnedRanges.clear();
 +      List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
 +
 +      if (failures.isEmpty()) {
 +        break;
 +      } else {
 +        // tried to only do table state checks when failures.size() == ranges.size(), however this
 +        // did
 +        // not work because nothing ever invalidated entries in the tabletLocator cache... so even
 +        // though
 +        // the table was deleted the tablet locator entries for the deleted table were not
 +        // cleared... so
 +        // need to always do the check when failures occur
 +        if (failures.size() >= lastFailureSize) {
 +          context.requireNotDeleted(tableId);
 +          context.requireNotOffline(tableId, tableName);
 +        }
 +        lastFailureSize = failures.size();
 +
 +        if (log.isTraceEnabled()) {
 +          log.trace("Failed to bin {} ranges, tablet locations were null, retrying in 100ms",
 +              failures.size());
 +        }
 +
 +        try {
 +          Thread.sleep(100);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +
 +    }
 +
 +    // truncate the ranges to within the tablets... this makes it easier to know what work
 +    // needs to be redone when failures occurs and tablets have merged or split
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
 +    for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
 +      Map<KeyExtent,List<Range>> tabletMap = new HashMap<>();
 +      binnedRanges2.put(entry.getKey(), tabletMap);
 +      for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) {
 +        Range tabletRange = tabletRanges.getKey().toDataRange();
 +        List<Range> clippedRanges = new ArrayList<>();
 +        tabletMap.put(tabletRanges.getKey(), clippedRanges);
 +        for (Range range : tabletRanges.getValue()) {
 +          clippedRanges.add(tabletRange.clip(range));
 +        }
 +      }
 +    }
 +
 +    binnedRanges.clear();
 +    binnedRanges.putAll(binnedRanges2);
 +  }
 +
 +  private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver,
 +      List<Column> columns, Duration scanServerSelectorDelay)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    if (log.isTraceEnabled()) {
 +      log.trace("Failed to execute multiscans against {} tablets, retrying...", failures.size());
 +    }
 +
 +    try {
 +      if (scanServerSelectorDelay != null) {
 +        Thread.sleep(scanServerSelectorDelay.toMillis());
 +      } else {
 +        Thread.sleep(failSleepTime);
 +      }
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +
 +      // We were interrupted (close called on batchscanner) just exit
 +      log.debug("Exiting failure processing on interrupt");
 +      return;
 +    }
 +
 +    failSleepTime = Math.min(5000, failSleepTime * 2);
 +
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 +    List<Range> allRanges = new ArrayList<>();
 +
 +    for (List<Range> ranges : failures.values()) {
 +      allRanges.addAll(ranges);
 +    }
 +
 +    // since the first call to binRanges clipped the ranges to within a tablet, we should not get
 +    // only
 +    // bin to the set of failed tablets
 +    binRanges(locator, allRanges, binnedRanges);
 +
 +    doLookups(binnedRanges, receiver, columns);
 +  }
 +
 +  private String getTableInfo() {
 +    return context.getPrintableTableInfoFromId(tableId);
 +  }
 +
 +  private class QueryTask implements Runnable {
 +
 +    private String tsLocation;
 +    private Map<KeyExtent,List<Range>> tabletsRanges;
 +    private ResultReceiver receiver;
 +    private Semaphore semaphore = null;
 +    private final Map<KeyExtent,List<Range>> failures;
 +    private List<Column> columns;
 +    private int semaphoreSize;
 +    private final long busyTimeout;
 +    private final ScanServerAttemptReporter reporter;
 +    private final Duration scanServerSelectorDelay;
 +
 +    QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
 +        Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns,
 +        long busyTimeout, ScanServerAttemptReporter reporter, Duration scanServerSelectorDelay) {
 +      this.tsLocation = tsLocation;
 +      this.tabletsRanges = tabletsRanges;
 +      this.receiver = receiver;
 +      this.columns = columns;
 +      this.failures = failures;
 +      this.busyTimeout = busyTimeout;
 +      this.reporter = reporter;
 +      this.scanServerSelectorDelay = scanServerSelectorDelay;
 +    }
 +
 +    void setSemaphore(Semaphore semaphore, int semaphoreSize) {
 +      this.semaphore = semaphore;
 +      this.semaphoreSize = semaphoreSize;
 +    }
 +
 +    @Override
 +    public void run() {
 +      String threadName = Thread.currentThread().getName();
 +      Thread.currentThread()
 +          .setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation);
 +      log.debug("looking up {} ranges at {}", tabletsRanges.size(), tsLocation);
 +      Map<KeyExtent,List<Range>> unscanned = new HashMap<>();
 +      Map<KeyExtent,List<Range>> tsFailures = new HashMap<>();
 +      try {
 +        TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
 +        if (timeoutTracker == null) {
-           timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
++          timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, retryTimeout);
 +          timeoutTrackers.put(tsLocation, timeoutTracker);
 +        }
 +        doLookup(context, tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns,
 +            options, authorizations, timeoutTracker, busyTimeout);
 +
 +        if (!tsFailures.isEmpty()) {
 +          locator.invalidateCache(tsFailures.keySet());
 +          synchronized (failures) {
 +            failures.putAll(tsFailures);
 +          }
 +        }
 +
 +      } catch (IOException e) {
 +        if (!TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) {
 +          synchronized (failures) {
 +            failures.putAll(tsFailures);
 +            failures.putAll(unscanned);
 +          }
 +
 +          locator.invalidateCache(context, tsLocation);
 +        }
 +        log.debug("IOException thrown", e);
 +
 +        ScanServerAttempt.Result result = ScanServerAttempt.Result.ERROR;
 +        if (e.getCause() instanceof ScanServerBusyException) {
 +          result = ScanServerAttempt.Result.BUSY;
 +        }
 +        reporter.report(result);
 +      } catch (AccumuloSecurityException e) {
 +        e.setTableInfo(getTableInfo());
 +        log.debug("AccumuloSecurityException thrown", e);
 +
 +        context.clearTableListCache();
 +        if (context.tableNodeExists(tableId)) {
 +          fatalException = e;
 +        } else {
 +          fatalException = new TableDeletedException(tableId.canonical());
 +        }
 +      } catch (SampleNotPresentException e) {
 +        fatalException = e;
 +      } catch (Exception t) {
 +        if (queryThreadPool.isShutdown()) {
 +          log.debug("Caught exception, but queryThreadPool is shutdown", t);
 +        } else {
 +          log.warn("Caught exception, but queryThreadPool is not shutdown", t);
 +        }
 +        fatalException = t;
 +      } catch (Throwable t) {
 +        fatalException = t;
 +        throw t; // let uncaught exception handler deal with the Error
 +      } finally {
 +        semaphore.release();
 +        Thread.currentThread().setName(threadName);
 +        if (semaphore.tryAcquire(semaphoreSize)) {
 +          // finished processing all queries
 +          if (fatalException == null && !failures.isEmpty()) {
 +            // there were some failures
 +            try {
 +              processFailures(failures, receiver, columns, scanServerSelectorDelay);
 +            } catch (TableNotFoundException | AccumuloException e) {
 +              log.debug("{}", e.getMessage(), e);
 +              fatalException = e;
 +            } catch (AccumuloSecurityException e) {
 +              e.setTableInfo(getTableInfo());
 +              log.debug("{}", e.getMessage(), e);
 +              fatalException = e;
 +            } catch (Exception t) {
 +              log.debug("{}", t.getMessage(), t);
 +              fatalException = t;
 +            }
 +
 +            if (fatalException != null) {
 +              // we are finished with this batch query
 +              if (!resultsQueue.offer(LAST_BATCH)) {
 +                log.debug(
 +                    "Could not add to result queue after seeing fatalException in processFailures",
 +                    fatalException);
 +              }
 +            }
 +          } else {
 +            // we are finished with this batch query
 +            if (fatalException != null) {
 +              if (!resultsQueue.offer(LAST_BATCH)) {
 +                log.debug("Could not add to result queue after seeing fatalException",
 +                    fatalException);
 +              }
 +            } else {
 +              try {
 +                resultsQueue.put(LAST_BATCH);
 +              } catch (InterruptedException e) {
 +                fatalException = e;
 +                if (!resultsQueue.offer(LAST_BATCH)) {
 +                  log.debug("Could not add to result queue after seeing fatalException",
 +                      fatalException);
 +                }
 +              }
 +            }
 +          }
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
 +      final ResultReceiver receiver, List<Column> columns) {
 +
 +    int maxTabletsPerRequest = Integer.MAX_VALUE;
 +
 +    long busyTimeout = 0;
 +    Duration scanServerSelectorDelay = null;
 +    Map<String,ScanServerAttemptReporter> reporters = Map.of();
 +
 +    if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
 +      var scanServerData = rebinToScanServers(binnedRanges);
 +      busyTimeout = scanServerData.actions.getBusyTimeout().toMillis();
 +      reporters = scanServerData.reporters;
 +      scanServerSelectorDelay = scanServerData.actions.getDelay();
 +      binnedRanges = scanServerData.binnedRanges;
 +    } else {
 +      // when there are lots of threads and a few tablet servers
 +      // it is good to break request to tablet servers up, the
 +      // following code determines if this is the case
 +      if (numThreads / binnedRanges.size() > 1) {
 +        int totalNumberOfTablets = 0;
 +        for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
 +          totalNumberOfTablets += entry.getValue().size();
 +        }
 +
 +        maxTabletsPerRequest = totalNumberOfTablets / numThreads;
 +        if (maxTabletsPerRequest == 0) {
 +          maxTabletsPerRequest = 1;
 +        }
 +      }
 +    }
 +
 +    log.debug("timed out servers: {}", timedoutServers);
 +    log.debug("binned range servers: {}", binnedRanges.keySet());
 +    if (timedoutServers.containsAll(binnedRanges.keySet())) {
 +      // all servers have timed out
 +      throw new TimedOutException(timedoutServers);
 +    }
 +
 +    Map<KeyExtent,List<Range>> failures = new HashMap<>();
 +
 +    if (!timedoutServers.isEmpty()) {
 +      // go ahead and fail any timed out servers
 +      for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator =
 +          binnedRanges.entrySet().iterator(); iterator.hasNext();) {
 +        Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
 +        if (timedoutServers.contains(entry.getKey())) {
 +          failures.putAll(entry.getValue());
 +          iterator.remove();
 +        }
 +      }
 +    }
 +
 +    // randomize tabletserver order... this will help when there are multiple
 +    // batch readers and writers running against accumulo
 +    List<String> locations = new ArrayList<>(binnedRanges.keySet());
 +    Collections.shuffle(locations);
 +
 +    List<QueryTask> queryTasks = new ArrayList<>();
 +
 +    for (final String tsLocation : locations) {
 +
 +      final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
 +      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
 +        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns,
 +            busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
 +        queryTasks.add(queryTask);
 +      } else {
 +        HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
 +        for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
 +          tabletSubset.put(entry.getKey(), entry.getValue());
 +          if (tabletSubset.size() >= maxTabletsPerRequest) {
 +            QueryTask queryTask =
 +                new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout,
 +                    reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
 +            queryTasks.add(queryTask);
 +            tabletSubset = new HashMap<>();
 +          }
 +        }
 +
 +        if (!tabletSubset.isEmpty()) {
 +          QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns,
 +              busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
 +          queryTasks.add(queryTask);
 +        }
 +      }
 +    }
 +
 +    final Semaphore semaphore = new Semaphore(queryTasks.size());
 +    semaphore.acquireUninterruptibly(queryTasks.size());
 +
 +    for (QueryTask queryTask : queryTasks) {
 +      queryTask.setSemaphore(semaphore, queryTasks.size());
 +      queryThreadPool.execute(queryTask);
 +    }
 +  }
 +
 +  private static class ScanServerData {
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
 +    ScanServerSelections actions;
 +    Map<String,ScanServerAttemptReporter> reporters;
 +  }
 +
 +  private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
 +    ScanServerSelector ecsm = context.getScanServerSelector();
 +
 +    List<TabletIdImpl> tabletIds =
 +        binnedRanges.values().stream().flatMap(extentMap -> extentMap.keySet().stream())
 +            .map(TabletIdImpl::new).collect(Collectors.toList());
 +
 +    // get a snapshot of this once,not each time the plugin request it
 +    var scanAttemptsSnapshot = scanAttempts.snapshot();
 +
 +    ScanServerSelector.SelectorParameters params = new ScanServerSelector.SelectorParameters() {
 +      @Override
 +      public Collection<TabletId> getTablets() {
 +        return Collections.unmodifiableCollection(tabletIds);
 +      }
 +
 +      @Override
 +      public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) {
 +        return scanAttemptsSnapshot.getOrDefault(tabletId, Set.of());
 +      }
 +
 +      @Override
 +      public Map<String,String> getHints() {
 +        return options.executionHints;
 +      }
 +    };
 +
 +    var actions = ecsm.selectServers(params);
 +
 +    Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
 +    Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>();
 +
 +    binnedRanges.forEach((server, extentMap) -> {
 +      extentMap.forEach((extent, ranges) -> {
 +        extentToTserverMap.put(extent, server);
 +        extentToRangesMap.put(extent, ranges);
 +      });
 +    });
 +
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
 +
 +    Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
 +
 +    for (TabletIdImpl tabletId : tabletIds) {
 +      KeyExtent extent = tabletId.toKeyExtent();
 +      String serverToUse = actions.getScanServer(tabletId);
 +      if (serverToUse == null) {
 +        // no scan server was given so use the tablet server
 +        serverToUse = extentToTserverMap.get(extent);
 +        log.trace("For tablet {} scan server selector chose tablet_server", tabletId);
 +      } else {
 +        log.trace("For tablet {} scan server selector chose scan_server:{}", tabletId, serverToUse);
 +      }
 +
 +      var rangeMap = binnedRanges2.computeIfAbsent(serverToUse, k -> new HashMap<>());
 +      List<Range> ranges = extentToRangesMap.get(extent);
 +      rangeMap.put(extent, ranges);
 +
 +      var server = serverToUse;
 +      reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server, tabletId));
 +    }
 +
 +    ScanServerData ssd = new ScanServerData();
 +
 +    ssd.binnedRanges = binnedRanges2;
 +    ssd.actions = actions;
 +    ssd.reporters = reporters;
 +    log.trace("Scan server selector chose delay:{} busyTimeout:{}", actions.getDelay(),
 +        actions.getBusyTimeout());
 +    return ssd;
 +  }
 +
 +  static void trackScanning(Map<KeyExtent,List<Range>> failures,
 +      Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
 +
 +    // translate returned failures, remove them from unscanned, and add them to failures
 +    // @formatter:off
 +    Map<KeyExtent, List<Range>> retFailures = scanResult.failures.entrySet().stream().collect(Collectors.toMap(
 +                    entry -> KeyExtent.fromThrift(entry.getKey()),
 +                    entry -> entry.getValue().stream().map(Range::new).collect(Collectors.toList())
 +    ));
 +    // @formatter:on
 +    unscanned.keySet().removeAll(retFailures.keySet());
 +    failures.putAll(retFailures);
 +
 +    // translate full scans and remove them from unscanned
 +    Set<KeyExtent> fullScans =
 +        scanResult.fullScans.stream().map(KeyExtent::fromThrift).collect(Collectors.toSet());
 +    unscanned.keySet().removeAll(fullScans);
 +
 +    // remove partial scan from unscanned
 +    if (scanResult.partScan != null) {
 +      KeyExtent ke = KeyExtent.fromThrift(scanResult.partScan);
 +      Key nextKey = new Key(scanResult.partNextKey);
 +
 +      ListIterator<Range> iterator = unscanned.get(ke).listIterator();
 +      while (iterator.hasNext()) {
 +        Range range = iterator.next();
 +
 +        if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey())
 +            && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
 +          iterator.remove();
 +        } else if (range.contains(nextKey)) {
 +          iterator.remove();
 +          Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(),
 +              range.isEndKeyInclusive());
 +          iterator.add(partRange);
 +        }
 +      }
 +    }
 +  }
 +
 +  private static class TimeoutTracker {
 +
 +    String server;
 +    Set<String> badServers;
 +    long timeOut;
 +    long activityTime;
 +    Long firstErrorTime = null;
 +
 +    TimeoutTracker(String server, Set<String> badServers, long timeOut) {
 +      this(timeOut);
 +      this.server = server;
 +      this.badServers = badServers;
 +    }
 +
 +    TimeoutTracker(long timeOut) {
 +      this.timeOut = timeOut;
 +    }
 +
 +    void startingScan() {
 +      activityTime = System.currentTimeMillis();
 +    }
 +
 +    void check() throws IOException {
 +      if (System.currentTimeMillis() - activityTime > timeOut) {
 +        badServers.add(server);
 +        throw new IOException(
 +            "Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
 +      }
 +    }
 +
 +    void madeProgress() {
 +      activityTime = System.currentTimeMillis();
 +      firstErrorTime = null;
 +    }
 +
 +    void errorOccured() {
 +      if (firstErrorTime == null) {
 +        firstErrorTime = activityTime;
 +      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
 +        badServers.add(server);
 +      }
 +    }
 +
 +    public long getTimeOut() {
 +      return timeOut;
 +    }
 +  }
 +
 +  public static void doLookup(ClientContext context, String server,
 +      Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
 +      Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns,
 +      ScannerOptions options, Authorizations authorizations)
 +      throws IOException, AccumuloSecurityException, AccumuloServerException {
 +    doLookup(context, server, requested, failures, unscanned, receiver, columns, options,
 +        authorizations, new TimeoutTracker(Long.MAX_VALUE), 0L);
 +  }
 +
 +  static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested,
 +      Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
 +      ResultReceiver receiver, List<Column> columns, ScannerOptions options,
 +      Authorizations authorizations, TimeoutTracker timeoutTracker, long busyTimeout)
 +      throws IOException, AccumuloSecurityException, AccumuloServerException {
 +
 +    if (requested.isEmpty()) {
 +      return;
 +    }
 +
 +    // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
 +    for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
 +      ArrayList<Range> ranges = new ArrayList<>();
 +      for (Range range : entry.getValue()) {
 +        ranges.add(new Range(range));
 +      }
 +      unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
 +    }
 +
 +    timeoutTracker.startingScan();
 +    try {
 +      final HostAndPort parsedServer = HostAndPort.fromString(server);
 +      final TabletScanClientService.Client client;
 +      if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) {
 +        client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context,
 +            timeoutTracker.getTimeOut());
 +      } else {
 +        client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context);
 +      }
 +
 +      try {
 +
 +        OpTimer timer = null;
 +
 +        if (log.isTraceEnabled()) {
 +          log.trace(
 +              "tid={} Starting multi scan, tserver={}  #tablets={}  #ranges={} ssil={} ssio={}",
 +              Thread.currentThread().getId(), server, requested.size(),
 +              sumSizes(requested.values()), options.serverSideIteratorList,
 +              options.serverSideIteratorOptions);
 +
 +          timer = new OpTimer().start();
 +        }
 +
 +        TabletType ttype = TabletType.type(requested.keySet());
 +        boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
 +
 +        // @formatter:off
 +        Map<TKeyExtent, List<TRange>> thriftTabletRanges = requested.entrySet().stream().collect(Collectors.toMap(
 +                        entry -> entry.getKey().toThrift(),
 +                        entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
 +        ));
 +        // @formatter:on
 +
 +        Map<String,String> execHints =
 +            options.executionHints.isEmpty() ? null : options.executionHints;
 +
 +        InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(),
 +            thriftTabletRanges, columns.stream().map(Column::toThrift).collect(Collectors.toList()),
 +            options.serverSideIteratorList, options.serverSideIteratorOptions,
 +            ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
 +            SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
-             options.batchTimeOut, options.classLoaderContext, execHints, busyTimeout);
++            options.batchTimeout, options.classLoaderContext, execHints, busyTimeout);
 +        if (waitForWrites) {
 +          ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
 +        }
 +
 +        MultiScanResult scanResult = imsr.result;
 +
 +        if (timer != null) {
 +          timer.stop();
 +          log.trace("tid={} Got 1st multi scan results, #results={} {} in {}",
 +              Thread.currentThread().getId(), scanResult.results.size(),
 +              (scanResult.more ? "scanID=" + imsr.scanID : ""),
 +              String.format("%.3f secs", timer.scale(SECONDS)));
 +        }
 +
 +        ArrayList<Entry<Key,Value>> entries = new ArrayList<>(scanResult.results.size());
 +        for (TKeyValue kv : scanResult.results) {
 +          entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
 +        }
 +
 +        if (!entries.isEmpty()) {
 +          receiver.receive(entries);
 +        }
 +
 +        if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
 +          timeoutTracker.madeProgress();
 +        }
 +
 +        trackScanning(failures, unscanned, scanResult);
 +
 +        AtomicLong nextOpid = new AtomicLong();
 +
 +        while (scanResult.more) {
 +
 +          timeoutTracker.check();
 +
 +          if (timer != null) {
 +            log.trace("tid={} oid={} Continuing multi scan, scanid={}",
 +                Thread.currentThread().getId(), nextOpid.get(), imsr.scanID);
 +            timer.reset().start();
 +          }
 +
 +          scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID, busyTimeout);
 +
 +          if (timer != null) {
 +            timer.stop();
 +            log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}",
 +                Thread.currentThread().getId(), nextOpid.getAndIncrement(),
 +                scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""),
 +                String.format("%.3f secs", timer.scale(SECONDS)));
 +          }
 +
 +          entries = new ArrayList<>(scanResult.results.size());
 +          for (TKeyValue kv : scanResult.results) {
 +            entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
 +          }
 +
 +          if (!entries.isEmpty()) {
 +            receiver.receive(entries);
 +          }
 +
 +          if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
 +            timeoutTracker.madeProgress();
 +          }
 +
 +          trackScanning(failures, unscanned, scanResult);
 +        }
 +
 +        client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
 +
 +      } finally {
 +        ThriftUtil.returnClient(client, context);
 +      }
 +    } catch (TTransportException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage());
 +      timeoutTracker.errorOccured();
 +      throw new IOException(e);
 +    } catch (ThriftSecurityException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (TApplicationException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
 +      throw new AccumuloServerException(server, e);
 +    } catch (NoSuchScanIDException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
 +      throw new IOException(e);
 +    } catch (ScanServerBusyException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
 +      throw new IOException(e);
 +    } catch (TSampleNotPresentException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
 +      String tableInfo = "?";
 +      if (e.getExtent() != null) {
 +        TableId tableId = KeyExtent.fromThrift(e.getExtent()).tableId();
 +        tableInfo = context.getPrintableTableInfoFromId(tableId);
 +      }
 +      String message = "Table " + tableInfo + " does not have sampling configured or built";
 +      throw new SampleNotPresentException(message, e);
 +    } catch (TException e) {
 +      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
 +      timeoutTracker.errorOccured();
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  static int sumSizes(Collection<List<Range>> values) {
 +    int sum = 0;
 +
 +    for (List<Range> list : values) {
 +      sum += list.size();
 +    }
 +
 +    return sum;
 +  }
 +}