You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/04/15 10:50:12 UTC

[accumulo] branch master updated: Fix #1581 Move ThriftClientHandler out of TabletServer (#1584)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 960a009  Fix #1581 Move ThriftClientHandler out of TabletServer (#1584)
960a009 is described below

commit 960a009f0899df1b5af31be122d468361c6febb4
Author: andrewglowacki <an...@live.com>
AuthorDate: Wed Apr 15 06:50:01 2020 -0400

    Fix #1581 Move ThriftClientHandler out of TabletServer (#1584)
    
    Move ThriftClientHandler, AssignmentHandler and UnloadTabletHandler classes out of
    TabletServer and into their own separate files.
---
 .../server/client/ClientServiceHandler.java        |    6 +-
 .../apache/accumulo/tserver/AssignmentHandler.java |  238 +++
 .../org/apache/accumulo/tserver/TabletServer.java  | 2092 +-------------------
 .../tserver/TabletServerResourceManager.java       |    1 -
 .../accumulo/tserver/ThriftClientHandler.java      | 1792 +++++++++++++++++
 .../accumulo/tserver/UnloadTabletHandler.java      |  141 ++
 6 files changed, 2201 insertions(+), 2069 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index bbfa6c0..2c8aad9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -74,9 +74,9 @@ import org.slf4j.LoggerFactory;
 public class ClientServiceHandler implements ClientService.Iface {
   private static final Logger log = LoggerFactory.getLogger(ClientServiceHandler.class);
   protected final TransactionWatcher transactionWatcher;
-  private final ServerContext context;
-  private final VolumeManager fs;
-  private final SecurityOperation security;
+  protected final ServerContext context;
+  protected final VolumeManager fs;
+  protected final SecurityOperation security;
   private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
   public ClientServiceHandler(ServerContext context, TransactionWatcher transactionWatcher,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
new file mode 100644
index 0000000..6bebe14
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AssignmentHandler implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(AssignmentHandler.class);
+  private final KeyExtent extent;
+  private final int retryAttempt;
+  private final TabletServer server;
+
+  public AssignmentHandler(TabletServer server, KeyExtent extent) {
+    this(server, extent, 0);
+  }
+
+  public AssignmentHandler(TabletServer server, KeyExtent extent, int retryAttempt) {
+    this.server = server;
+    this.extent = extent;
+    this.retryAttempt = retryAttempt;
+  }
+
+  @Override
+  public void run() {
+    synchronized (server.unopenedTablets) {
+      synchronized (server.openingTablets) {
+        synchronized (server.onlineTablets) {
+          // nothing should be moving between sets, do a sanity
+          // check
+          Set<KeyExtent> unopenedOverlapping =
+              KeyExtent.findOverlapping(extent, server.unopenedTablets);
+          Set<KeyExtent> openingOverlapping =
+              KeyExtent.findOverlapping(extent, server.openingTablets);
+          Set<KeyExtent> onlineOverlapping =
+              KeyExtent.findOverlapping(extent, server.onlineTablets.snapshot());
+
+          if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent)) {
+            return;
+          }
+
+          if (!unopenedOverlapping.contains(extent)) {
+            log.info("assignment {} no longer in the unopened set", extent);
+            return;
+          }
+
+          if (unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0
+              || onlineOverlapping.size() > 0) {
+            throw new IllegalStateException(
+                "overlaps assigned " + extent + " " + !server.unopenedTablets.contains(extent) + " "
+                    + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
+          }
+        }
+
+        server.unopenedTablets.remove(extent);
+        server.openingTablets.add(extent);
+      }
+    }
+
+    // check Metadata table before accepting assignment
+    Text locationToOpen = null;
+    TabletMetadata tabletMetadata = null;
+    boolean canLoad = false;
+    try {
+      tabletMetadata = server.getContext().getAmple().readTablet(extent);
+
+      canLoad = TabletServer.checkTabletMetadata(extent, server.getTabletSession(), tabletMetadata);
+
+      if (canLoad && tabletMetadata.sawOldPrevEndRow()) {
+        KeyExtent fixedExtent =
+            MasterMetadataUtil.fixSplit(server.getContext(), tabletMetadata, server.getLock());
+
+        synchronized (server.openingTablets) {
+          server.openingTablets.remove(extent);
+          server.openingTablets.notifyAll();
+          // it expected that the new extent will overlap the old one... if it does not, it
+          // should not be added to unopenedTablets
+          if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(fixedExtent)))
+              .contains(fixedExtent)) {
+            throw new IllegalStateException(
+                "Fixed split does not overlap " + extent + " " + fixedExtent);
+          }
+          server.unopenedTablets.add(fixedExtent);
+        }
+        // split was rolled back... try again
+        new AssignmentHandler(server, fixedExtent).run();
+        return;
+
+      }
+    } catch (Exception e) {
+      synchronized (server.openingTablets) {
+        server.openingTablets.remove(extent);
+        server.openingTablets.notifyAll();
+      }
+      log.warn("Failed to verify tablet " + extent, e);
+      server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+      throw new RuntimeException(e);
+    }
+
+    if (!canLoad) {
+      log.debug("Reporting tablet {} assignment failure: unable to verify Tablet Information",
+          extent);
+      synchronized (server.openingTablets) {
+        server.openingTablets.remove(extent);
+        server.openingTablets.notifyAll();
+      }
+      server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+      return;
+    }
+
+    Tablet tablet = null;
+    boolean successful = false;
+
+    try {
+      server.acquireRecoveryMemory(extent);
+
+      TabletResourceManager trm = server.resourceManager.createTabletResourceManager(extent,
+          server.getTableConfiguration(extent));
+      TabletData data = new TabletData(tabletMetadata);
+
+      tablet = new Tablet(server, extent, trm, data);
+      // If a minor compaction starts after a tablet opens, this indicates a log recovery
+      // occurred. This recovered data must be minor compacted.
+      // There are three reasons to wait for this minor compaction to finish before placing the
+      // tablet in online tablets.
+      //
+      // 1) The log recovery code does not handle data written to the tablet on multiple tablet
+      // servers.
+      // 2) The log recovery code does not block if memory is full. Therefore recovering lots of
+      // tablets that use a lot of memory could run out of memory.
+      // 3) The minor compaction finish event did not make it to the logs (the file will be in
+      // metadata, preventing replay of compacted data)... but do not
+      // want a majc to wipe the file out from metadata and then have another process failure...
+      // this could cause duplicate data to replay.
+      if (tablet.getNumEntriesInMemory() > 0
+          && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
+        throw new RuntimeException("Minor compaction after recovery fails for " + extent);
+      }
+      Assignment assignment = new Assignment(extent, server.getTabletSession());
+      TabletStateStore.setLocation(server.getContext(), assignment);
+
+      synchronized (server.openingTablets) {
+        synchronized (server.onlineTablets) {
+          server.openingTablets.remove(extent);
+          server.onlineTablets.put(extent, tablet);
+          server.openingTablets.notifyAll();
+          server.recentlyUnloadedCache.remove(tablet.getExtent());
+        }
+      }
+      tablet = null; // release this reference
+      successful = true;
+    } catch (Throwable e) {
+      log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);
+
+      if (e.getMessage() != null) {
+        log.warn("{}", e.getMessage());
+      }
+
+      TableId tableId = extent.getTableId();
+      ProblemReports.getInstance(server.getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
+          extent.getUUID().toString(), server.getClientAddressString(), e));
+    } finally {
+      server.releaseRecoveryMemory(extent);
+    }
+
+    if (!successful) {
+      synchronized (server.unopenedTablets) {
+        synchronized (server.openingTablets) {
+          server.openingTablets.remove(extent);
+          server.unopenedTablets.add(extent);
+          server.openingTablets.notifyAll();
+        }
+      }
+      log.warn("failed to open tablet {} reporting failure to master", extent);
+      server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
+      long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
+      log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
+      SimpleTimer.getInstance(server.getConfiguration()).schedule(new TimerTask() {
+        @Override
+        public void run() {
+          log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
+          AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+          if (extent.isMeta()) {
+            if (extent.isRootTablet()) {
+              new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+            } else {
+              server.resourceManager.addMetaDataAssignment(extent, log, handler);
+            }
+          } else {
+            server.resourceManager.addAssignment(extent, log, handler);
+          }
+        }
+      }, reschedule);
+    } else {
+      server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
+    }
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4ba635c..0a6d359 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,15 +19,11 @@
 package org.apache.accumulo.tserver;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
-import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,113 +41,43 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
-import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Durability;
-import org.apache.accumulo.core.client.SampleNotPresentException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.clientImpl.CompressedIterators;
 import org.apache.accumulo.core.clientImpl.DurabilityImpl;
-import org.apache.accumulo.core.clientImpl.Tables;
 import org.apache.accumulo.core.clientImpl.TabletLocator;
-import org.apache.accumulo.core.clientImpl.TabletType;
-import org.apache.accumulo.core.clientImpl.Translator;
-import org.apache.accumulo.core.clientImpl.Translator.TKeyExtentTranslator;
-import org.apache.accumulo.core.clientImpl.Translator.TRangeTranslator;
-import org.apache.accumulo.core.clientImpl.Translators;
-import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
-import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.ConstraintViolationSummary;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
-import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
-import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
-import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
-import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
-import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
-import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
-import org.apache.accumulo.core.dataImpl.thrift.TCMStatus;
-import org.apache.accumulo.core.dataImpl.thrift.TColumn;
-import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
-import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
-import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
-import org.apache.accumulo.core.dataImpl.thrift.TMutation;
-import org.apache.accumulo.core.dataImpl.thrift.TRange;
-import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
-import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
-import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
-import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
-import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletLoadState;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.spi.cache.BlockCache;
-import org.apache.accumulo.core.spi.scan.ScanDispatcher;
-import org.apache.accumulo.core.summary.Gatherer;
-import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
-import org.apache.accumulo.core.summary.SummaryCollection;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
-import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.TDurability;
-import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
-import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
-import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.ComparablePair;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.HostAndPort;
@@ -172,7 +98,6 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
@@ -180,9 +105,7 @@ import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.TabletLevel;
-import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -190,15 +113,7 @@ import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
-import org.apache.accumulo.server.master.state.TabletStateStore;
-import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.replication.ZooKeeperInitialization;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
@@ -211,93 +126,63 @@ import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretM
 import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher;
 import org.apache.accumulo.server.util.FileSystemMonitor;
 import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.MasterMetadataUtil;
 import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
-import org.apache.accumulo.tserver.RowLocks.RowLock;
 import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
-import org.apache.accumulo.tserver.data.ServerConditionalMutation;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.LogSorter;
 import org.apache.accumulo.tserver.log.MutationReceiver;
 import org.apache.accumulo.tserver.log.TabletServerLogger;
 import org.apache.accumulo.tserver.mastermessage.MasterMessage;
 import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
-import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
 import org.apache.accumulo.tserver.metrics.TabletServerMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
 import org.apache.accumulo.tserver.replication.ReplicationWorker;
-import org.apache.accumulo.tserver.scan.LookupTask;
-import org.apache.accumulo.tserver.scan.NextBatchTask;
-import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.apache.accumulo.tserver.scan.ScanRunState;
-import org.apache.accumulo.tserver.session.ConditionalSession;
-import org.apache.accumulo.tserver.session.MultiScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
-import org.apache.accumulo.tserver.session.SingleScanSession;
-import org.apache.accumulo.tserver.session.SummarySession;
-import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
 import org.apache.accumulo.tserver.tablet.CommitSession;
-import org.apache.accumulo.tserver.tablet.CompactionInfo;
 import org.apache.accumulo.tserver.tablet.CompactionWatcher;
-import org.apache.accumulo.tserver.tablet.Compactor;
-import org.apache.accumulo.tserver.tablet.KVEntry;
-import org.apache.accumulo.tserver.tablet.PreparedMutations;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Tablet;
-import org.apache.accumulo.tserver.tablet.TabletClosedException;
 import org.apache.accumulo.tserver.tablet.TabletData;
 import org.apache.commons.collections4.map.LRUMap;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.server.TServer;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.collect.Collections2;
 
 public class TabletServer extends AbstractServer {
 
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
-  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
-  private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
 
-  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
-  private final TransactionWatcher watcher;
-  private ZooCache masterLockCache;
+  final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+  final ZooCache masterLockCache;
 
-  private final TabletServerLogger logger;
+  final TabletServerLogger logger;
 
-  private final TabletServerUpdateMetrics updateMetrics;
-  private final TabletServerScanMetrics scanMetrics;
-  private final TabletServerMinCMetrics mincMetrics;
+  final TabletServerUpdateMetrics updateMetrics;
+  final TabletServerScanMetrics scanMetrics;
+  final TabletServerMinCMetrics mincMetrics;
 
   public TabletServerScanMetrics getScanMetrics() {
     return scanMetrics;
@@ -309,7 +194,7 @@ public class TabletServer extends AbstractServer {
 
   private final LogSorter logSorter;
   private ReplicationWorker replWorker = null;
-  private final TabletStatsKeeper statsKeeper;
+  final TabletStatsKeeper statsKeeper;
   private final AtomicInteger logIdGenerator = new AtomicInteger();
 
   private final AtomicLong flushCounter = new AtomicLong(0);
@@ -317,22 +202,19 @@ public class TabletServer extends AbstractServer {
 
   private final VolumeManager fs;
 
-  private final OnlineTablets onlineTablets = new OnlineTablets();
-  private final SortedSet<KeyExtent> unopenedTablets =
-      Collections.synchronizedSortedSet(new TreeSet<>());
-  private final SortedSet<KeyExtent> openingTablets =
-      Collections.synchronizedSortedSet(new TreeSet<>());
-  private final Map<KeyExtent,Long> recentlyUnloadedCache =
-      Collections.synchronizedMap(new LRUMap<>(1000));
+  final OnlineTablets onlineTablets = new OnlineTablets();
+  final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>());
+  final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>());
+  final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap<>(1000));
 
-  private final TabletServerResourceManager resourceManager;
+  final TabletServerResourceManager resourceManager;
   private final SecurityOperation security;
 
   private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<>();
 
   private Thread majorCompactorThread;
 
-  private HostAndPort clientAddress;
+  HostAndPort clientAddress;
 
   private volatile boolean serverStopRequested = false;
   private volatile boolean shutdownComplete = false;
@@ -364,7 +246,6 @@ public class TabletServer extends AbstractServer {
     ServerContext context = super.getContext();
     context.setupCrypto();
     this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null);
-    this.watcher = new TransactionWatcher(context);
     this.fs = context.getVolumeManager();
     final AccumuloConfiguration aconf = getConfiguration();
     log.info("Version " + Constants.VERSION);
@@ -493,1649 +374,19 @@ public class TabletServer extends AbstractServer {
     return (long) ((1. + (r.nextDouble() / 10)) * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
-  private final SessionManager sessionManager;
+  final SessionManager sessionManager;
 
-  private final WriteTracker writeTracker = new WriteTracker();
-
-  private final RowLocks rowLocks = new RowLocks();
-
-  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
-  private final ReentrantLock recoveryLock = new ReentrantLock(true);
-  private ThriftClientHandler clientHandler;
-  private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
-
-  private class ThriftClientHandler extends ClientServiceHandler
-      implements TabletClientService.Iface {
-
-    ThriftClientHandler() {
-      super(getContext(), watcher, fs);
-      log.debug("{} created", ThriftClientHandler.class.getName());
-    }
-
-    @Override
-    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final long tid,
-        final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
-        throws ThriftSecurityException {
-
-      if (!security.canPerformSystemActions(credentials)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-
-      try {
-        return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
-          List<TKeyExtent> failures = new ArrayList<>();
-
-          for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
-            TKeyExtent tke = entry.getKey();
-            Map<String,MapFileInfo> fileMap = entry.getValue();
-            Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
-            for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
-              Path path = new Path(mapping.getKey());
-              FileSystem ns = fs.getFileSystemByPath(path);
-              path = ns.makeQualified(path);
-              fileRefMap.put(new TabletFile(path), mapping.getValue());
-            }
-
-            Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
-            if (importTablet == null) {
-              failures.add(tke);
-            } else {
-              try {
-                importTablet.importMapFiles(tid, fileRefMap, setTime);
-              } catch (IOException ioe) {
-                log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
-                    ioe.getMessage());
-                failures.add(tke);
-              }
-            }
-          }
-          return failures;
-        });
-      } catch (RuntimeException e) {
-        throw e;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
-        Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
-        throws ThriftSecurityException {
-      if (!security.canPerformSystemActions(credentials)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-
-      watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
-        tabletImports.forEach((tke, fileMap) -> {
-          Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
-          for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
-            Path path = new Path(dir, mapping.getKey());
-            FileSystem ns = fs.getFileSystemByPath(path);
-            path = ns.makeQualified(path);
-            newFileMap.put(new TabletFile(path), mapping.getValue());
-          }
-
-          Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
-          if (importTablet != null) {
-            try {
-              importTablet.importMapFiles(tid, newFileMap, setTime);
-            } catch (IOException ioe) {
-              log.debug("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
-                  ioe.getMessage());
-            }
-          }
-        });
-      });
-
-    }
-
-    private ScanDispatcher getScanDispatcher(KeyExtent extent) {
-      if (extent.isRootTablet() || extent.isMeta()) {
-        // dispatcher is only for user tables
-        return null;
-      }
-
-      return getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
-    }
-
-    @Override
-    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
-        TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
-        Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
-        boolean isolated, long readaheadThreshold, TSamplerConfiguration tSamplerConfig,
-        long batchTimeOut, String contextArg, Map<String,String> executionHints)
-        throws NotServingTabletException, ThriftSecurityException,
-        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-        TSampleNotPresentException {
-
-      TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
-      NamespaceId namespaceId;
-      try {
-        namespaceId = Tables.getNamespaceId(getContext(), tableId);
-      } catch (TableNotFoundException e1) {
-        throw new NotServingTabletException(textent);
-      }
-      if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
-          authorizations)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-
-      if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.BAD_AUTHORIZATIONS);
-      }
-
-      final KeyExtent extent = new KeyExtent(textent);
-
-      // wait for any writes that are in flight.. this done to ensure
-      // consistency across client restarts... assume a client writes
-      // to accumulo and dies while waiting for a confirmation from
-      // accumulo... the client process restarts and tries to read
-      // data from accumulo making the assumption that it will get
-      // any writes previously made... however if the server side thread
-      // processing the write from the dead client is still in progress,
-      // the restarted client may not see the write unless we wait here.
-      // this behavior is very important when the client is reading the
-      // metadata
-      if (waitForWrites) {
-        writeTracker.waitForWrites(TabletType.type(extent));
-      }
-
-      Tablet tablet = getOnlineTablet(extent);
-      if (tablet == null) {
-        throw new NotServingTabletException(textent);
-      }
-
-      HashSet<Column> columnSet = new HashSet<>();
-      for (TColumn tcolumn : columns) {
-        columnSet.add(new Column(tcolumn));
-      }
-
-      ScanParameters scanParams = new ScanParameters(batchSize, new Authorizations(authorizations),
-          columnSet, ssiList, ssio, isolated, SamplerConfigurationImpl.fromThrift(tSamplerConfig),
-          batchTimeOut, contextArg);
-
-      final SingleScanSession scanSession = new SingleScanSession(credentials, extent, scanParams,
-          readaheadThreshold, executionHints);
-      scanSession.scanner =
-          tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag);
-
-      long sid = sessionManager.createSession(scanSession, true);
-
-      ScanResult scanResult;
-      try {
-        scanResult = continueScan(tinfo, sid, scanSession);
-      } catch (NoSuchScanIDException e) {
-        log.error("The impossible happened", e);
-        throw new RuntimeException();
-      } finally {
-        sessionManager.unreserveSession(sid);
-      }
-
-      return new InitialScan(sid, scanResult);
-    }
-
-    @Override
-    public ScanResult continueScan(TInfo tinfo, long scanID)
-        throws NoSuchScanIDException, NotServingTabletException,
-        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-        TSampleNotPresentException {
-      SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID);
-      if (scanSession == null) {
-        throw new NoSuchScanIDException();
-      }
-
-      try {
-        return continueScan(tinfo, scanID, scanSession);
-      } finally {
-        sessionManager.unreserveSession(scanSession);
-      }
-    }
-
-    private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
-        throws NoSuchScanIDException, NotServingTabletException,
-        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-        TSampleNotPresentException {
-
-      if (scanSession.nextBatchTask == null) {
-        scanSession.nextBatchTask =
-            new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
-            scanSession, scanSession.nextBatchTask);
-      }
-
-      ScanBatch bresult;
-      try {
-        bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
-            TimeUnit.MILLISECONDS);
-        scanSession.nextBatchTask = null;
-      } catch (ExecutionException e) {
-        sessionManager.removeSession(scanID);
-        if (e.getCause() instanceof NotServingTabletException) {
-          throw (NotServingTabletException) e.getCause();
-        } else if (e.getCause() instanceof TooManyFilesException) {
-          throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
-              scanSession.extent.toThrift());
-        } else if (e.getCause() instanceof SampleNotPresentException) {
-          throw new TSampleNotPresentException(scanSession.extent.toThrift());
-        } else if (e.getCause() instanceof IOException) {
-          sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-          List<KVEntry> empty = Collections.emptyList();
-          bresult = new ScanBatch(empty, true);
-          scanSession.nextBatchTask = null;
-        } else {
-          throw new RuntimeException(e);
-        }
-      } catch (CancellationException ce) {
-        sessionManager.removeSession(scanID);
-        Tablet tablet = getOnlineTablet(scanSession.extent);
-        if (tablet == null || tablet.isClosed()) {
-          throw new NotServingTabletException(scanSession.extent.toThrift());
-        } else {
-          throw new NoSuchScanIDException();
-        }
-      } catch (TimeoutException e) {
-        List<TKeyValue> param = Collections.emptyList();
-        long timeout =
-            TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
-        sessionManager.removeIfNotAccessed(scanID, timeout);
-        return new ScanResult(param, true);
-      } catch (Throwable t) {
-        sessionManager.removeSession(scanID);
-        log.warn("Failed to get next batch", t);
-        throw new RuntimeException(t);
-      }
-
-      ScanResult scanResult = new ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
-
-      scanSession.entriesReturned += scanResult.results.size();
-
-      scanSession.batchCount++;
-
-      if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
-        // start reading next batch while current batch is transmitted
-        // to client
-        scanSession.nextBatchTask =
-            new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
-            scanSession, scanSession.nextBatchTask);
-      }
-
-      if (!scanResult.more) {
-        closeScan(tinfo, scanID);
-      }
-
-      return scanResult;
-    }
-
-    @Override
-    public void closeScan(TInfo tinfo, long scanID) {
-      final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID);
-      if (ss != null) {
-        long t2 = System.currentTimeMillis();
-
-        if (log.isTraceEnabled()) {
-          log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
-              TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
-              (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
-        }
-
-        scanMetrics.addScan(t2 - ss.startTime);
-        scanMetrics.addResult(ss.entriesReturned);
-      }
-    }
-
-    @Override
-    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
-        Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
-        Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
-        TSamplerConfiguration tSamplerConfig, long batchTimeOut, String contextArg,
-        Map<String,String> executionHints)
-        throws ThriftSecurityException, TSampleNotPresentException {
-      // find all of the tables that need to be scanned
-      final HashSet<TableId> tables = new HashSet<>();
-      for (TKeyExtent keyExtent : tbatch.keySet()) {
-        tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
-      }
-
-      if (tables.size() != 1) {
-        throw new IllegalArgumentException("Cannot batch scan over multiple tables");
-      }
-
-      // check if user has permission to the tables
-      for (TableId tableId : tables) {
-        NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-        if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
-            authorizations)) {
-          throw new ThriftSecurityException(credentials.getPrincipal(),
-              SecurityErrorCode.PERMISSION_DENIED);
-        }
-      }
-
-      try {
-        if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
-          throw new ThriftSecurityException(credentials.getPrincipal(),
-              SecurityErrorCode.BAD_AUTHORIZATIONS);
-        }
-      } catch (ThriftSecurityException tse) {
-        log.error("{} is not authorized", credentials.getPrincipal(), tse);
-        throw tse;
-      }
-      Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(),
-          new Translator.ListTranslator<>(new TRangeTranslator()));
-
-      // This is used to determine which thread pool to use
-      KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-
-      if (waitForWrites) {
-        writeTracker.waitForWrites(TabletType.type(batch.keySet()));
-      }
-
-      Set<Column> columnSet = tcolumns.isEmpty() ? Collections.emptySet()
-          : new HashSet<>(Collections2.transform(tcolumns, Column::new));
-
-      ScanParameters scanParams =
-          new ScanParameters(-1, new Authorizations(authorizations), columnSet, ssiList, ssio,
-              false, SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut, contextArg);
-
-      final MultiScanSession mss =
-          new MultiScanSession(credentials, threadPoolExtent, batch, scanParams, executionHints);
-
-      mss.numTablets = batch.size();
-      for (List<Range> ranges : batch.values()) {
-        mss.numRanges += ranges.size();
-      }
-
-      long sid = sessionManager.createSession(mss, true);
-
-      MultiScanResult result;
-      try {
-        result = continueMultiScan(sid, mss);
-      } finally {
-        sessionManager.unreserveSession(sid);
-      }
-
-      return new InitialMultiScan(sid, result);
-    }
-
-    @Override
-    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
-        throws NoSuchScanIDException, TSampleNotPresentException {
-
-      MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-
-      if (session == null) {
-        throw new NoSuchScanIDException();
-      }
-
-      try {
-        return continueMultiScan(scanID, session);
-      } finally {
-        sessionManager.unreserveSession(session);
-      }
-    }
-
-    private MultiScanResult continueMultiScan(long scanID, MultiScanSession session)
-        throws TSampleNotPresentException {
-
-      if (session.lookupTask == null) {
-        session.lookupTask = new LookupTask(TabletServer.this, scanID);
-        resourceManager.executeReadAhead(session.threadPoolExtent,
-            getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
-      }
-
-      try {
-        MultiScanResult scanResult =
-            session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-        session.lookupTask = null;
-        return scanResult;
-      } catch (ExecutionException e) {
-        sessionManager.removeSession(scanID);
-        if (e.getCause() instanceof SampleNotPresentException) {
-          throw new TSampleNotPresentException();
-        } else {
-          log.warn("Failed to get multiscan result", e);
-          throw new RuntimeException(e);
-        }
-      } catch (TimeoutException e1) {
-        long timeout =
-            TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
-        sessionManager.removeIfNotAccessed(scanID, timeout);
-        List<TKeyValue> results = Collections.emptyList();
-        Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
-        List<TKeyExtent> fullScans = Collections.emptyList();
-        return new MultiScanResult(results, failures, fullScans, null, null, false, true);
-      } catch (Throwable t) {
-        sessionManager.removeSession(scanID);
-        log.warn("Failed to get multiscan result", t);
-        throw new RuntimeException(t);
-      }
-    }
-
-    @Override
-    public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-      MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
-      if (session == null) {
-        throw new NoSuchScanIDException();
-      }
-
-      long t2 = System.currentTimeMillis();
-
-      if (log.isTraceEnabled()) {
-        log.trace(String.format(
-            "MultiScanSess %s %,d entries in %.2f secs"
-                + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
-            TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0,
-            session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
-      }
-    }
-
-    @Override
-    public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
-        throws ThriftSecurityException {
-      // Make sure user is real
-      Durability durability = DurabilityImpl.fromThrift(tdurabilty);
-      security.authenticateUser(credentials, credentials);
-      updateMetrics.addPermissionErrors(0);
-
-      UpdateSession us = new UpdateSession(
-          new TservConstraintEnv(getContext(), security, credentials), credentials, durability);
-      return sessionManager.createSession(us, false);
-    }
-
-    private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
-      long t1 = System.currentTimeMillis();
-      if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
-        return;
-      }
-      if (us.currentTablet == null
-          && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
-        // if there were previous failures, then do not accept additional writes
-        return;
-      }
-
-      TableId tableId = null;
-      try {
-        // if user has no permission to write to this table, add it to
-        // the failures list
-        boolean sameTable = us.currentTablet != null
-            && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
-        tableId = keyExtent.getTableId();
-        if (sameTable || security.canWrite(us.getCredentials(), tableId,
-            Tables.getNamespaceId(getContext(), tableId))) {
-          long t2 = System.currentTimeMillis();
-          us.authTimes.addStat(t2 - t1);
-          us.currentTablet = getOnlineTablet(keyExtent);
-          if (us.currentTablet != null) {
-            us.queuedMutations.put(us.currentTablet, new ArrayList<>());
-          } else {
-            // not serving tablet, so report all mutations as
-            // failures
-            us.failures.put(keyExtent, 0L);
-            updateMetrics.addUnknownTabletErrors(0);
-          }
-        } else {
-          log.warn("Denying access to table {} for user {}", keyExtent.getTableId(), us.getUser());
-          long t2 = System.currentTimeMillis();
-          us.authTimes.addStat(t2 - t1);
-          us.currentTablet = null;
-          us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
-          updateMetrics.addPermissionErrors(0);
-          return;
-        }
-      } catch (TableNotFoundException tnfe) {
-        log.error("Table " + tableId + " not found ", tnfe);
-        long t2 = System.currentTimeMillis();
-        us.authTimes.addStat(t2 - t1);
-        us.currentTablet = null;
-        us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
-        updateMetrics.addUnknownTabletErrors(0);
-        return;
-      } catch (ThriftSecurityException e) {
-        log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
-            e);
-        long t2 = System.currentTimeMillis();
-        us.authTimes.addStat(t2 - t1);
-        us.currentTablet = null;
-        us.authFailures.put(keyExtent, e.getCode());
-        updateMetrics.addPermissionErrors(0);
-        return;
-      }
-    }
-
-    @Override
-    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
-        List<TMutation> tmutations) {
-      UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
-      if (us == null) {
-        return;
-      }
-
-      boolean reserved = true;
-      try {
-        KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-        setUpdateTablet(us, keyExtent);
-
-        if (us.currentTablet != null) {
-          long additionalMutationSize = 0;
-          List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
-          for (TMutation tmutation : tmutations) {
-            Mutation mutation = new ServerMutation(tmutation);
-            mutations.add(mutation);
-            additionalMutationSize += mutation.numBytes();
-          }
-          us.queuedMutationSize += additionalMutationSize;
-          long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize);
-          long total = TabletServer.this.getConfiguration()
-              .getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
-          if (totalQueued > total) {
-            try {
-              flush(us);
-            } catch (HoldTimeoutException hte) {
-              // Assumption is that the client has timed out and is gone. If that's not the case,
-              // then removing the session should cause the client to fail
-              // in such a way that it retries.
-              log.debug("HoldTimeoutException during applyUpdates, removing session");
-              sessionManager.removeSession(updateID, true);
-              reserved = false;
-            }
-          }
-        }
-      } finally {
-        if (reserved) {
-          sessionManager.unreserveSession(us);
-        }
-      }
-    }
-
-    private void flush(UpdateSession us) {
-
-      int mutationCount = 0;
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
-      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
-      Throwable error = null;
-
-      long pt1 = System.currentTimeMillis();
-
-      boolean containsMetadataTablet = false;
-      for (Tablet tablet : us.queuedMutations.keySet()) {
-        if (tablet.getExtent().isMeta()) {
-          containsMetadataTablet = true;
-        }
-      }
-
-      if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
-        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-      }
-
-      try (TraceScope prep = Trace.startSpan("prep")) {
-        for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-
-          Tablet tablet = entry.getKey();
-          Durability durability =
-              DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
-          List<Mutation> mutations = entry.getValue();
-          if (mutations.size() > 0) {
-            try {
-              updateMetrics.addMutationArraySize(mutations.size());
-
-              PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);
-
-              if (prepared.tabletClosed()) {
-                if (us.currentTablet == tablet) {
-                  us.currentTablet = null;
-                }
-                us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
-              } else {
-                if (!prepared.getNonViolators().isEmpty()) {
-                  List<Mutation> validMutations = prepared.getNonViolators();
-                  CommitSession session = prepared.getCommitSession();
-                  if (durability != Durability.NONE) {
-                    loggables.put(session,
-                        new TabletMutations(session, validMutations, durability));
-                  }
-                  sendables.put(session, validMutations);
-                }
-
-                if (!prepared.getViolations().isEmpty()) {
-                  us.violations.add(prepared.getViolations());
-                  updateMetrics.addConstraintViolations(0);
-                }
-                // Use the size of the original mutation list, regardless of how many mutations
-                // did not violate constraints.
-                mutationCount += mutations.size();
-
-              }
-            } catch (Throwable t) {
-              error = t;
-              log.error("Unexpected error preparing for commit", error);
-              break;
-            }
-          }
-        }
-      }
-
-      long pt2 = System.currentTimeMillis();
-      us.prepareTimes.addStat(pt2 - pt1);
-      updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
-
-      if (error != null) {
-        sendables.forEach((commitSession, value) -> commitSession.abortCommit());
-        throw new RuntimeException(error);
-      }
-      try {
-        try (TraceScope wal = Trace.startSpan("wal")) {
-          while (true) {
-            try {
-              long t1 = System.currentTimeMillis();
-
-              logger.logManyTablets(loggables);
-
-              long t2 = System.currentTimeMillis();
-              us.walogTimes.addStat(t2 - t1);
-              updateWalogWriteTime((t2 - t1));
-              break;
-            } catch (IOException | FSError ex) {
-              log.warn("logging mutations failed, retrying");
-            } catch (Throwable t) {
-              log.error("Unknown exception logging mutations, counts"
-                  + " for mutations in flight not decremented!", t);
-              throw new RuntimeException(t);
-            }
-          }
-        }
-
-        try (TraceScope commit = Trace.startSpan("commit")) {
-          long t1 = System.currentTimeMillis();
-          sendables.forEach((commitSession, mutations) -> {
-            commitSession.commit(mutations);
-            KeyExtent extent = commitSession.getExtent();
-
-            if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
-              // because constraint violations may filter out some
-              // mutations, for proper accounting with the client code,
-              // need to increment the count based on the original
-              // number of mutations from the client NOT the filtered number
-              us.successfulCommits.increment(us.currentTablet,
-                  us.queuedMutations.get(us.currentTablet).size());
-            }
-          });
-          long t2 = System.currentTimeMillis();
-
-          us.flushTime += (t2 - pt1);
-          us.commitTimes.addStat(t2 - t1);
-
-          updateAvgCommitTime(t2 - t1, sendables.size());
-        }
-      } finally {
-        us.queuedMutations.clear();
-        if (us.currentTablet != null) {
-          us.queuedMutations.put(us.currentTablet, new ArrayList<>());
-        }
-        updateTotalQueuedMutationSize(-us.queuedMutationSize);
-        us.queuedMutationSize = 0;
-      }
-      us.totalUpdates += mutationCount;
-    }
-
-    private void updateWalogWriteTime(long time) {
-      updateMetrics.addWalogWriteTime(time);
-    }
-
-    private void updateAvgCommitTime(long time, int size) {
-      if (size > 0)
-        updateMetrics.addCommitTime((long) (time / (double) size));
-    }
-
-    private void updateAvgPrepTime(long time, int size) {
-      if (size > 0)
-        updateMetrics.addCommitPrep((long) (time / (double) size));
-    }
-
-    @Override
-    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
-      final UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
-      if (us == null) {
-        throw new NoSuchScanIDException();
-      }
-
-      // clients may or may not see data from an update session while
-      // it is in progress, however when the update session is closed
-      // want to ensure that reads wait for the write to finish
-      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
-      try {
-        flush(us);
-      } catch (HoldTimeoutException e) {
-        // Assumption is that the client has timed out and is gone. If that's not the case throw an
-        // exception that will cause it to retry.
-        log.debug("HoldTimeoutException during closeUpdate, reporting no such session");
-        throw new NoSuchScanIDException();
-      } finally {
-        writeTracker.finishWrite(opid);
-      }
-
-      if (log.isTraceEnabled()) {
-        log.trace(
-            String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
-                TServerUtils.clientAddress.get(), us.totalUpdates,
-                (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
-                us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
-                us.commitTimes.sum() / 1000.0));
-      }
-      if (us.failures.size() > 0) {
-        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
-        log.debug(String.format("Failures: %d, first extent %s successful commits: %d",
-            us.failures.size(), first.getKey().toString(), first.getValue()));
-      }
-      List<ConstraintViolationSummary> violations = us.violations.asList();
-      if (violations.size() > 0) {
-        ConstraintViolationSummary first = us.violations.asList().iterator().next();
-        log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(),
-            first.violationDescription, first.numberOfViolatingMutations));
-      }
-      if (us.authFailures.size() > 0) {
-        KeyExtent first = us.authFailures.keySet().iterator().next();
-        log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(),
-            first.toString()));
-      }
-
-      return new UpdateErrors(Translator.translate(us.failures, Translators.KET),
-          Translator.translate(violations, Translators.CVST),
-          Translator.translate(us.authFailures, Translators.KET));
-    }
-
-    @Override
-    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
-        TMutation tmutation, TDurability tdurability)
-        throws NotServingTabletException, ConstraintViolationException, ThriftSecurityException {
-
-      final TableId tableId = TableId.of(new String(tkeyExtent.getTable(), UTF_8));
-      NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-      if (!security.canWrite(credentials, tableId, namespaceId)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-      final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-      final Tablet tablet = getOnlineTablet(new KeyExtent(keyExtent));
-      if (tablet == null) {
-        throw new NotServingTabletException(tkeyExtent);
-      }
-      Durability tabletDurability = tablet.getDurability();
-
-      if (!keyExtent.isMeta()) {
-        try {
-          TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-        } catch (HoldTimeoutException hte) {
-          // Major hack. Assumption is that the client has timed out and is gone. If that's not the
-          // case, then throwing the following will let client know there
-          // was a failure and it should retry.
-          throw new NotServingTabletException(tkeyExtent);
-        }
-      }
-
-      final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
-      try {
-        final Mutation mutation = new ServerMutation(tmutation);
-        final List<Mutation> mutations = Collections.singletonList(mutation);
-
-        PreparedMutations prepared;
-        try (TraceScope prep = Trace.startSpan("prep")) {
-          prepared = tablet.prepareMutationsForCommit(
-              new TservConstraintEnv(getContext(), security, credentials), mutations);
-        }
-
-        if (prepared.tabletClosed()) {
-          throw new NotServingTabletException(tkeyExtent);
-        } else if (!prepared.getViolators().isEmpty()) {
-          throw new ConstraintViolationException(
-              Translator.translate(prepared.getViolations().asList(), Translators.CVST));
-        } else {
-          CommitSession session = prepared.getCommitSession();
-          Durability durability = DurabilityImpl
-              .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
-
-          // Instead of always looping on true, skip completely when durability is NONE.
-          while (durability != Durability.NONE) {
-            try {
-              try (TraceScope wal = Trace.startSpan("wal")) {
-                logger.log(session, mutation, durability);
-              }
-              break;
-            } catch (IOException ex) {
-              log.warn("Error writing mutations to log", ex);
-            }
-          }
-
-          try (TraceScope commit = Trace.startSpan("commit")) {
-            session.commit(mutations);
-          }
-        }
-      } finally {
-        writeTracker.finishWrite(opid);
-      }
-    }
-
-    private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
-        throws ThriftSecurityException {
-      try {
-        return Tables.getNamespaceId(getContext(), tableId);
-      } catch (TableNotFoundException e1) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.TABLE_DOESNT_EXIST);
-      }
-    }
-
-    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
-        ArrayList<TCMResult> results, ConditionalSession cs, List<String> symbols)
-        throws IOException {
-      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter =
-          updates.entrySet().iterator();
-
-      final CompressedIterators compressedIters = new CompressedIterators(symbols);
-      ConditionCheckerContext checkerContext = new ConditionCheckerContext(getContext(),
-          compressedIters, getContext().getTableConfiguration(cs.tableId));
-
-      while (iter.hasNext()) {
-        final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
-        final Tablet tablet = getOnlineTablet(entry.getKey());
-
-        if (tablet == null || tablet.isClosed()) {
-          for (ServerConditionalMutation scm : entry.getValue()) {
-            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          }
-          iter.remove();
-        } else {
-          final List<ServerConditionalMutation> okMutations =
-              new ArrayList<>(entry.getValue().size());
-          final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
-
-          ConditionChecker checker =
-              checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
-          try {
-            tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
-
-            if (okMutations.size() > 0) {
-              entry.setValue(okMutations);
-            } else {
-              iter.remove();
-            }
-          } catch (TabletClosedException | IterationInterruptedException
-              | TooManyFilesException e) {
-            // clear anything added while checking conditions.
-            resultsSubList.clear();
-
-            for (ServerConditionalMutation scm : entry.getValue()) {
-              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-            }
-            iter.remove();
-          }
-        }
-      }
-    }
-
-    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates,
-        ArrayList<TCMResult> results, ConditionalSession sess) {
-      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
-
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
-      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
-
-      boolean sessionCanceled = sess.interruptFlag.get();
-
-      try (TraceScope prepSpan = Trace.startSpan("prep")) {
-        long t1 = System.currentTimeMillis();
-        for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
-          final Tablet tablet = getOnlineTablet(entry.getKey());
-          if (tablet == null || tablet.isClosed() || sessionCanceled) {
-            addMutationsAsTCMResults(results, entry.getValue(), TCMStatus.IGNORED);
-          } else {
-            final Durability durability =
-                DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability());
-
-            @SuppressWarnings("unchecked")
-            List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
-            if (!mutations.isEmpty()) {
-
-              PreparedMutations prepared = tablet.prepareMutationsForCommit(
-                  new TservConstraintEnv(getContext(), security, sess.credentials), mutations);
-
-              if (prepared.tabletClosed()) {
-                addMutationsAsTCMResults(results, mutations, TCMStatus.IGNORED);
-              } else {
-                if (!prepared.getNonViolators().isEmpty()) {
-                  // Only log and commit mutations that did not violate constraints.
-                  List<Mutation> validMutations = prepared.getNonViolators();
-                  addMutationsAsTCMResults(results, validMutations, TCMStatus.ACCEPTED);
-                  CommitSession session = prepared.getCommitSession();
-                  if (durability != Durability.NONE) {
-                    loggables.put(session,
-                        new TabletMutations(session, validMutations, durability));
-                  }
-                  sendables.put(session, validMutations);
-                }
-
-                if (!prepared.getViolators().isEmpty()) {
-                  addMutationsAsTCMResults(results, prepared.getViolators(), TCMStatus.VIOLATED);
-                }
-              }
-            }
-          }
-        }
-
-        long t2 = System.currentTimeMillis();
-        updateAvgPrepTime(t2 - t1, es.size());
-      }
-
-      try (TraceScope walSpan = Trace.startSpan("wal")) {
-        while (loggables.size() > 0) {
-          try {
-            long t1 = System.currentTimeMillis();
-            logger.logManyTablets(loggables);
-            long t2 = System.currentTimeMillis();
-            updateWalogWriteTime(t2 - t1);
-            break;
-          } catch (IOException | FSError ex) {
-            log.warn("logging mutations failed, retrying");
-          } catch (Throwable t) {
-            log.error("Unknown exception logging mutations, counts for"
-                + " mutations in flight not decremented!", t);
-            throw new RuntimeException(t);
-          }
-        }
-      }
-
-      try (TraceScope commitSpan = Trace.startSpan("commit")) {
-        long t1 = System.currentTimeMillis();
-        sendables.forEach(CommitSession::commit);
-        long t2 = System.currentTimeMillis();
-        updateAvgCommitTime(t2 - t1, sendables.size());
-      }
-    }
-
-    /**
-     * Transform and add each mutation as a {@link TCMResult} with the mutation's ID and the
-     * specified status to the {@link TCMResult} list.
-     */
-    private void addMutationsAsTCMResults(final List<TCMResult> list,
-        final Collection<? extends Mutation> mutations, final TCMStatus status) {
-      mutations.stream()
-          .map(mutation -> new TCMResult(((ServerConditionalMutation) mutation).getID(), status))
-          .forEach(list::add);
-    }
-
-    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs,
-        Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results,
-        List<String> symbols) throws IOException {
-      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is
-      // more efficient and detect duplicate rows.
-      ConditionalMutationSet.sortConditionalMutations(updates);
-
-      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
-
-      // can not process two mutations for the same row, because one will not see what the other
-      // writes
-      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-
-      // get as many locks as possible w/o blocking... defer any rows that are locked
-      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
-      try {
-        try (TraceScope checkSpan = Trace.startSpan("Check conditions")) {
-          checkConditions(updates, results, cs, symbols);
-        }
-
-        try (TraceScope updateSpan = Trace.startSpan("apply conditional mutations")) {
-          writeConditionalMutations(updates, results, cs);
-        }
-      } finally {
-        rowLocks.releaseRowLocks(locks);
-      }
-      return deferred;
-    }
-
-    @Override
-    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
-        List<ByteBuffer> authorizations, String tableIdStr, TDurability tdurabilty,
-        String classLoaderContext) throws ThriftSecurityException, TException {
-
-      TableId tableId = TableId.of(tableIdStr);
-      Authorizations userauths = null;
-      NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-      if (!security.canConditionallyUpdate(credentials, tableId, namespaceId)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-
-      userauths = security.getUserAuthorizations(credentials);
-      for (ByteBuffer auth : authorizations) {
-        if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
-          throw new ThriftSecurityException(credentials.getPrincipal(),
-              SecurityErrorCode.BAD_AUTHORIZATIONS);
-        }
-      }
-
-      ConditionalSession cs = new ConditionalSession(credentials,
-          new Authorizations(authorizations), tableId, DurabilityImpl.fromThrift(tdurabilty));
-
-      long sid = sessionManager.createSession(cs, false);
-      return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
-    }
-
-    @Override
-    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
-        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
-        throws NoSuchScanIDException, TException {
-
-      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
-
-      if (cs == null || cs.interruptFlag.get()) {
-        throw new NoSuchScanIDException();
-      }
-
-      if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
-        try {
-          TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-        } catch (HoldTimeoutException hte) {
-          // Assumption is that the client has timed out and is gone. If that's not the case throw
-          // an exception that will cause it to retry.
-          log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session");
-          throw new NoSuchScanIDException();
-        }
-      }
-
-      TableId tid = cs.tableId;
-      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
-
-      try {
-        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
-            Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
-
-        for (KeyExtent ke : updates.keySet()) {
-          if (!ke.getTableId().equals(tid)) {
-            throw new IllegalArgumentException(
-                "Unexpected table id " + tid + " != " + ke.getTableId());
-          }
-        }
-
-        ArrayList<TCMResult> results = new ArrayList<>();
-
-        Map<KeyExtent,List<ServerConditionalMutation>> deferred =
-            conditionalUpdate(cs, updates, results, symbols);
-
-        while (deferred.size() > 0) {
-          deferred = conditionalUpdate(cs, deferred, results, symbols);
-        }
-
-        return results;
-      } catch (IOException ioe) {
-        throw new TException(ioe);
-      } finally {
-        writeTracker.finishWrite(opid);
-        sessionManager.unreserveSession(sessID);
-      }
-    }
-
-    @Override
-    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
-      // this method should wait for any running conditional update to complete
-      // after this method returns a conditional update should not be able to start
-
-      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
-      if (cs != null) {
-        cs.interruptFlag.set(true);
-      }
-
-      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
-      if (cs != null) {
-        sessionManager.removeSession(sessID, true);
-      }
-    }
-
-    @Override
-    public void closeConditionalUpdate(TInfo tinfo, long sessID) {
-      sessionManager.removeSession(sessID, false);
-    }
-
-    @Override
-    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
-        ByteBuffer splitPoint) throws NotServingTabletException, ThriftSecurityException {
-
-      TableId tableId = TableId.of(new String(ByteBufferUtil.toBytes(tkeyExtent.table)));
-      NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-
-      if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
-        throw new ThriftSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED);
-      }
-
-      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
-      Tablet tablet = getOnlineTablet(keyExtent);
-      if (tablet == null) {
-        throw new NotServingTabletException(tkeyExtent);
-      }
-
-      if (keyExtent.getEndRow() == null
-          || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
-        try {
-          if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
-            throw new NotServingTabletException(tkeyExtent);
-          }
-        } catch (IOException e) {
-          log.warn("Failed to split " + keyExtent, e);
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
-    @Override
-    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) {
-      return getStats(sessionManager.getActiveScansPerTable());
-    }
-
-    @Override
-    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) {
-      List<TabletStats> result = new ArrayList<>();
-      TableId text = TableId.of(tableId);
-      KeyExtent start = new KeyExtent(text, new Text(), null);
-      for (Entry<KeyExtent,Tablet> entry : getOnlineTablets().tailMap(start).entrySet()) {
-        KeyExtent ke = entry.getKey();
-        if (ke.getTableId().compareTo(text) == 0) {
-          Tablet tablet = entry.getValue();
-          TabletStats stats = tablet.getTabletStats();
-          stats.extent = ke.toThrift();
-          stats.ingestRate = tablet.ingestRate();
-          stats.queryRate = tablet.queryRate();
-          stats.splitCreationTime = tablet.getSplitCreationTime();
-          stats.numEntries = tablet.getNumEntries();
-          result.add(stats);
-        }
-      }
-      return result;
-    }
-
-    private void checkPermission(TCredentials credentials, String lock, final String request)
-        throws ThriftSecurityException {
-      try {
-        log.trace("Got {} message from user: {}", request, credentials.getPrincipal());
-        if (!security.canPerformSystemActions(credentials)) {
-          log.warn("Got {} message from user: {}", request, credentials.getPrincipal());
-          throw new ThriftSecurityException(credentials.getPrincipal(),
-              SecurityErrorCode.PERMISSION_DENIED);
-        }
-      } catch (ThriftSecurityException e) {
-        log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
-        if (getContext().getCredentials().getToken().getClass().getName()
-            .equals(credentials.getTokenClassName())) {
-          log.error("Got message from a service with a mismatched configuration."
-              + " Please ensure a compatible configuration.", e);
-        }
-        throw e;
-      }
-
-      if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
-        log.debug("Got {} message before my lock was acquired, ignoring...", request);
-        throw new RuntimeException("Lock not acquired");
-      }
-
-      if (tabletServerLock != null && tabletServerLock.wasLockAcquired()
-          && !tabletServerLock.isLocked()) {
-        Halt.halt(1, () -> {
-          log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
-              request);
-          gcLogger.logGCInfo(TabletServer.this.getConfiguration());
-        });
-      }
-
-      if (lock != null) {
-        ZooUtil.LockID lid =
-            new ZooUtil.LockID(getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
-
-        try {
-          if (!ZooLock.isLockHeld(masterLockCache, lid)) {
-            // maybe the cache is out of date and a new master holds the
-            // lock?
-            masterLockCache.clear();
-            if (!ZooLock.isLockHeld(masterLockCache, lid)) {
-              log.warn("Got {} message from a master that does not hold the current lock {}",
-                  request, lock);
-              throw new RuntimeException("bad master lock");
-            }
-          }
-        } catch (Exception e) {
-          throw new RuntimeException("bad master lock", e);
-        }
-      }
-    }
-
-    @Override
-    public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
-        final TKeyExtent textent) {
-
-      try {
-        checkPermission(credentials, lock, "loadTablet");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to load a tablet", e);
-        throw new RuntimeException(e);
-      }
-
-      final KeyExtent extent = new KeyExtent(textent);
-
-      synchronized (unopenedTablets) {
-        synchronized (openingTablets) {
-          synchronized (onlineTablets) {
-
-            // checking if this exact tablet is in any of the sets
-            // below is not a strong enough check
-            // when splits and fix splits occurring
-
-            Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
-            Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
-            Set<KeyExtent> onlineOverlapping =
-                KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
-
-            Set<KeyExtent> all = new HashSet<>();
-            all.addAll(unopenedOverlapping);
-            all.addAll(openingOverlapping);
-            all.addAll(onlineOverlapping);
-
-            if (!all.isEmpty()) {
-
-              // ignore any tablets that have recently split, for error logging
-              for (KeyExtent e2 : onlineOverlapping) {
-                Tablet tablet = getOnlineTablet(e2);
-                if (System.currentTimeMillis() - tablet.getSplitCreationTime()
-                    < RECENTLY_SPLIT_MILLIES) {
-                  all.remove(e2);
-                }
-              }
-
-              // ignore self, for error logging
-              all.remove(extent);
-
-              if (all.size() > 0) {
-                log.error("Tablet {} overlaps previously assigned {} {} {}", extent,
-                    unopenedOverlapping, openingOverlapping, onlineOverlapping + " " + all);
-              }
-              return;
-            }
-
-            unopenedTablets.add(extent);
-          }
-        }
-      }
-
-      TabletLogger.loading(extent, TabletServer.this.getTabletSession());
-
-      final AssignmentHandler ah = new AssignmentHandler(extent);
-      // final Runnable ah = new LoggingRunnable(log, );
-      // Root tablet assignment must take place immediately
-
-      if (extent.isRootTablet()) {
-        new Daemon("Root Tablet Assignment") {
-          @Override
-          public void run() {
-            ah.run();
-            if (onlineTablets.snapshot().containsKey(extent)) {
-              log.info("Root tablet loaded: {}", extent);
-            } else {
-              log.info("Root tablet failed to load");
-            }
-
-          }
-        }.start();
-      } else {
-        if (extent.isMeta()) {
-          resourceManager.addMetaDataAssignment(extent, log, ah);
-        } else {
-          resourceManager.addAssignment(extent, log, ah);
-        }
-      }
-    }
-
-    @Override
-    public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent,
-        TUnloadTabletGoal goal, long requestTime) {
-      try {
-        checkPermission(credentials, lock, "unloadTablet");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to unload a tablet", e);
-        throw new RuntimeException(e);
-      }
-
-      KeyExtent extent = new KeyExtent(textent);
-
-      resourceManager.addMigration(extent,
-          new LoggingRunnable(log, new UnloadTabletHandler(extent, goal, requestTime)));
-    }
-
-    @Override
-    public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId,
-        ByteBuffer startRow, ByteBuffer endRow) {
-      try {
-        checkPermission(credentials, lock, "flush");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to flush a table", e);
-        throw new RuntimeException(e);
-      }
-
-      ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
-
-      KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
-          ByteBufferUtil.toText(startRow));
-
-      for (Tablet tablet : getOnlineTablets().values()) {
-        if (ke.overlaps(tablet.getExtent())) {
-          tabletsToFlush.add(tablet);
-        }
-      }
-
-      Long flushID = null;
-
-      for (Tablet tablet : tabletsToFlush) {
-        if (flushID == null) {
-          // read the flush id once from zookeeper instead of reading
-          // it for each tablet
-          try {
-            flushID = tablet.getFlushID();
-          } catch (NoNodeException e) {
-            // table was probably deleted
-            log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
-            return;
-          }
-        }
-        tablet.flush(flushID);
-      }
-    }
-
-    @Override
-    public void flushTablet(TInfo tinfo, TCredentials credentials, String lock,
-        TKeyExtent textent) {
-      try {
-        checkPermission(credentials, lock, "flushTablet");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to flush a tablet", e);
-        throw new RuntimeException(e);
-      }
-
-      Tablet tablet = getOnlineTablet(new KeyExtent(textent));
-      if (tablet != null) {
-        log.info("Flushing {}", tablet.getExtent());
-        try {
-          tablet.flush(tablet.getFlushID());
-        } catch (NoNodeException nne) {
-          log.info("Asked to flush tablet that has no flush id {} {}", new KeyExtent(textent),
-              nne.getMessage());
-        }
-      }
-    }
-
-    @Override
-    public void halt(TInfo tinfo, TCredentials credentials, String lock)
-        throws ThriftSecurityException {
-
-      checkPermission(credentials, lock, "halt");
-
-      Halt.halt(0, () -> {
-        log.info("Master requested tablet server halt");
-        gcLogger.logGCInfo(TabletServer.this.getConfiguration());
-        serverStopRequested = true;
-        try {
-          tabletServerLock.unlock();
-        } catch (Exception e) {
-          log.error("Caught exception unlocking TabletServer lock", e);
-        }
-      });
-    }
-
-    @Override
-    public void fastHalt(TInfo info, TCredentials credentials, String lock) {
-      try {
-        halt(info, credentials, lock);
-      } catch (Exception e) {
-        log.warn("Error halting", e);
-      }
-    }
-
-    @Override
-    public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) {
-      return statsKeeper.getTabletStats();
-    }
-
-    @Override
-    public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials)
-        throws ThriftSecurityException, TException {
-      try {
-        checkPermission(credentials, null, "getScans");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to get active scans", e);
-        throw e;
-      }
-
-      return sessionManager.getActiveScans();
-    }
-
-    @Override
-    public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
-      try {
-        checkPermission(credentials, lock, "chop");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to chop extent", e);
-        throw new RuntimeException(e);
-      }
-
-      KeyExtent ke = new KeyExtent(textent);
-
-      Tablet tablet = getOnlineTablet(ke);
-      if (tablet != null) {
-        tablet.chopFiles();
-      }
-    }
-
-    @Override
-    public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId,
-        ByteBuffer startRow, ByteBuffer endRow) {
-      try {
-        checkPermission(credentials, lock, "compact");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to compact a table", e);
-        throw new RuntimeException(e);
-      }
-
-      KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
-          ByteBufferUtil.toText(startRow));
-
-      ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
-
-      for (Tablet tablet : getOnlineTablets().values()) {
-        if (ke.overlaps(tablet.getExtent())) {
-          tabletsToCompact.add(tablet);
-        }
-      }
-
-      Pair<Long,UserCompactionConfig> compactionInfo = null;
-
-      for (Tablet tablet : tabletsToCompact) {
-        // all for the same table id, so only need to read
-        // compaction id once
-        if (compactionInfo == null) {
-          try {
-            compactionInfo = tablet.getCompactionID();
-          } catch (NoNodeException e) {
-            log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
-            return;
-          }
-        }
-        tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
-      }
-
-    }
-
-    @Override
-    public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials)
-        throws ThriftSecurityException, TException {
-      try {
-        checkPermission(credentials, null, "getActiveCompactions");
-      } catch (ThriftSecurityException e) {
-        log.error("Caller doesn't have permission to get active compactions", e);
-        throw e;
-      }
-
-      List<CompactionInfo> compactions = Compactor.getRunningCompactions();
-      List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
-
-      for (CompactionInfo compactionInfo : compactions) {
-        ret.add(compactionInfo.toThrift());
-      }
-
-      return ret;
-    }
-
-    @Override
-    public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) {
-      String log = logger.getLogFile();
-      // Might be null if there no active logger
-      if (log == null) {
-        return Collections.emptyList();
-      }
-      return Collections.singletonList(log);
-    }
-
-    @Override
-    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) {
-      log.warn("Garbage collector is attempting to remove logs through the tablet server");
-      log.warn("This is probably because your file"
-          + " Garbage Collector is an older version than your tablet servers.\n"
-          + "Restart your file Garbage Collector.");
-    }
-
-    private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
-      try {
-        SummaryCollection sc =
-            future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-        return sc.toThrift();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private TSummaries handleTimeout(long sessionId) {
-      long timeout =
-          TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
-      sessionManager.removeIfNotAccessed(sessionId, timeout);
-      return new TSummaries(false, sessionId, -1, -1, null);
-    }
-
-    private TSummaries startSummaryOperation(TCredentials credentials,
-        Future<SummaryCollection> future) {
-      try {
-        return getSummaries(future);
-      } catch (TimeoutException e) {
-        long sid = sessionManager.createSession(new SummarySession(credentials, future), false);
-        while (sid == 0) {
-          sessionManager.removeSession(sid);
-          sid = sessionManager.createSession(new SummarySession(credentials, future), false);
-        }
-        return handleTimeout(sid);
-      }
-    }
-
-    @Override
-    public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials,
-        TSummaryRequest request)
-        throws ThriftSecurityException, ThriftTableOperationException, TException {
-      NamespaceId namespaceId;
-      TableId tableId = TableId.of(request.getTableId());
-      try {
-        namespaceId = Tables.getNamespaceId(getContext(), tableId);
-      } catch (TableNotFoundException e1) {
-        throw new ThriftTableOperationException(tableId.canonical(), null, null,
-            TableOperationExceptionType.NOTFOUND, null);
-      }
-
-      if (!security.canGetSummaries(credentials, tableId, namespaceId)) {
-        throw new AccumuloSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
-      }
-
-      ExecutorService es = resourceManager.getSummaryPartitionExecutor();
-      Future<SummaryCollection> future = new Gatherer(getContext(), request,
-          getContext().getTableConfiguration(tableId), getContext().getCryptoService()).gather(es);
-
-      return startSummaryOperation(credentials, future);
-    }
-
-    @Override
-    public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials,
-        TSummaryRequest request, int modulus, int remainder)
-        throws ThriftSecurityException, TException {
-      // do not expect users to call this directly, expect other tservers to call this method
-      if (!security.canPerformSystemActions(credentials)) {
-        throw new AccumuloSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
-      }
-
-      ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
-      TableConfiguration tableConfig =
-          getContext().getTableConfiguration(TableId.of(request.getTableId()));
-      Future<SummaryCollection> future =
-          new Gatherer(getContext(), request, tableConfig, getContext().getCryptoService())
-              .processPartition(spe, modulus, remainder);
-
-      return startSummaryOperation(credentials, future);
-    }
-
-    @Override
-    public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials,
-        TSummaryRequest request, Map<String,List<TRowRange>> files)
-        throws ThriftSecurityException, TException {
-      // do not expect users to call this directly, expect other tservers to call this method
-      if (!security.canPerformSystemActions(credentials)) {
-        throw new AccumuloSecurityException(credentials.getPrincipal(),
-            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
-      }
-
-      ExecutorService srp = resourceManager.getSummaryRetrievalExecutor();
-      TableConfiguration tableCfg =
-          getContext().getTableConfiguration(TableId.of(request.getTableId()));
-      BlockCache summaryCache = resourceManager.getSummaryCache();
-      BlockCache indexCache = resourceManager.getIndexCache();
-      Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
-      FileSystemResolver volMgr = p -> fs.getFileSystemByPath(p);
-      Future<SummaryCollection> future =
-          new Gatherer(getContext(), request, tableCfg, getContext().getCryptoService())
-              .processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
-
-      return startSummaryOperation(credentials, future);
-    }
+  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+  private final ReentrantLock recoveryLock = new ReentrantLock(true);
+  private ThriftClientHandler clientHandler;
+  private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
-    @Override
-    public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId)
-        throws NoSuchScanIDException, TException {
-      SummarySession session = (SummarySession) sessionManager.getSession(sessionId);
-      if (session == null) {
-        throw new NoSuchScanIDException();
-      }
+  String getLockID() {
+    return lockID;
+  }
 
-      Future<SummaryCollection> future = session.getFuture();
-      try {
-        TSummaries tsums = getSummaries(future);
-        sessionManager.removeSession(sessionId);
-        return tsums;
-      } catch (TimeoutException e) {
-        return handleTimeout(sessionId);
-      }
-    }
+  void requestStop() {
+    serverStopRequested = true;
   }
 
   private class SplitRunner implements Runnable {
@@ -2228,8 +479,7 @@ public class TabletServer extends AbstractServer {
     }
   }
 
-  private TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint)
-      throws IOException {
+  TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
     long t1 = System.currentTimeMillis();
 
     TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint);
@@ -2278,301 +528,13 @@ public class TabletServer extends AbstractServer {
     masterMessages.addLast(m);
   }
 
-  private class UnloadTabletHandler implements Runnable {
-    private final KeyExtent extent;
-    private final TUnloadTabletGoal goalState;
-    private final long requestTimeSkew;
-
-    public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) {
-      this.extent = extent;
-      this.goalState = goalState;
-      this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
-    }
-
-    @Override
-    public void run() {
-
-      Tablet t = null;
-
-      synchronized (unopenedTablets) {
-        if (unopenedTablets.contains(extent)) {
-          unopenedTablets.remove(extent);
-          // enqueueMasterMessage(new TabletUnloadedMessage(extent));
-          return;
-        }
-      }
-      synchronized (openingTablets) {
-        while (openingTablets.contains(extent)) {
-          try {
-            openingTablets.wait();
-          } catch (InterruptedException e) {}
-        }
-      }
-      synchronized (onlineTablets) {
-        if (onlineTablets.snapshot().containsKey(extent)) {
-          t = onlineTablets.snapshot().get(extent);
-        }
-      }
-
-      if (t == null) {
-        // Tablet has probably been recently unloaded: repeated master
-        // unload request is crossing the successful unloaded message
-        if (!recentlyUnloadedCache.containsKey(extent)) {
-          log.info("told to unload tablet that was not being served {}", extent);
-          enqueueMasterMessage(
-              new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
-        }
-        return;
-      }
-
-      try {
-        t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
-      } catch (Throwable e) {
-
-        if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
-          log.debug("Failed to unload tablet {}... it was already closing or closed : {}", extent,
-              e.getMessage());
-        } else {
-          log.error("Failed to close tablet {}... Aborting migration", extent, e);
-          enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
-        }
-        return;
-      }
-
-      // stop serving tablet - client will get not serving tablet
-      // exceptions
-      recentlyUnloadedCache.put(extent, System.currentTimeMillis());
-      onlineTablets.remove(extent);
-
-      try {
-        TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
-        TabletLocationState tls = null;
-        try {
-          tls = new TabletLocationState(extent, null, instance, null, null, null, false);
-        } catch (BadLocationStateException e) {
-          log.error("Unexpected error", e);
-        }
-        if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet()
-            || (extent.isMeta()
-                && !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
-          TabletStateStore.unassign(getContext(), tls, null);
-        } else {
-          TabletStateStore.suspend(getContext(), tls, null,
-              requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
-        }
-      } catch (DistributedStoreException ex) {
-        log.warn("Unable to update storage", ex);
-      } catch (KeeperException e) {
-        log.warn("Unable determine our zookeeper session information", e);
-      } catch (InterruptedException e) {
-        log.warn("Interrupted while getting our zookeeper session information", e);
-      }
-
-      // tell the master how it went
-      enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
-
-      // roll tablet stats over into tablet server's statsKeeper object as
-      // historical data
-      statsKeeper.saveMajorMinorTimes(t.getTabletStats());
-    }
-  }
-
-  protected class AssignmentHandler implements Runnable {
-    private final KeyExtent extent;
-    private final int retryAttempt;
-
-    public AssignmentHandler(KeyExtent extent) {
-      this(extent, 0);
-    }
-
-    public AssignmentHandler(KeyExtent extent, int retryAttempt) {
-      this.extent = extent;
-      this.retryAttempt = retryAttempt;
-    }
-
-    @Override
-    public void run() {
-      synchronized (unopenedTablets) {
-        synchronized (openingTablets) {
-          synchronized (onlineTablets) {
-            // nothing should be moving between sets, do a sanity
-            // check
-            Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
-            Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
-            Set<KeyExtent> onlineOverlapping =
-                KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
-
-            if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent)) {
-              return;
-            }
-
-            if (!unopenedOverlapping.contains(extent)) {
-              log.info("assignment {} no longer in the unopened set", extent);
-              return;
-            }
-
-            if (unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0
-                || onlineOverlapping.size() > 0) {
-              throw new IllegalStateException(
-                  "overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " "
-                      + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
-            }
-          }
-
-          unopenedTablets.remove(extent);
-          openingTablets.add(extent);
-        }
-      }
-
-      // check Metadata table before accepting assignment
-      Text locationToOpen = null;
-      TabletMetadata tabletMetadata = null;
-      boolean canLoad = false;
-      try {
-        tabletMetadata = getContext().getAmple().readTablet(extent);
-
-        canLoad = checkTabletMetadata(extent, TabletServer.this.getTabletSession(), tabletMetadata);
-
-        if (canLoad && tabletMetadata.sawOldPrevEndRow()) {
-          KeyExtent fixedExtent =
-              MasterMetadataUtil.fixSplit(getContext(), tabletMetadata, getLock());
-
-          synchronized (openingTablets) {
-            openingTablets.remove(extent);
-            openingTablets.notifyAll();
-            // it expected that the new extent will overlap the old one... if it does not, it
-            // should not be added to unopenedTablets
-            if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(fixedExtent)))
-                .contains(fixedExtent)) {
-              throw new IllegalStateException(
-                  "Fixed split does not overlap " + extent + " " + fixedExtent);
-            }
-            unopenedTablets.add(fixedExtent);
-          }
-          // split was rolled back... try again
-          new AssignmentHandler(fixedExtent).run();
-          return;
-
-        }
-      } catch (Exception e) {
-        synchronized (openingTablets) {
-          openingTablets.remove(extent);
-          openingTablets.notifyAll();
-        }
-        log.warn("Failed to verify tablet " + extent, e);
-        enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
-        throw new RuntimeException(e);
-      }
-
-      if (!canLoad) {
-        log.debug("Reporting tablet {} assignment failure: unable to verify Tablet Information",
-            extent);
-        synchronized (openingTablets) {
-          openingTablets.remove(extent);
-          openingTablets.notifyAll();
-        }
-        enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
-        return;
-      }
-
-      Tablet tablet = null;
-      boolean successful = false;
-
-      try {
-        acquireRecoveryMemory(extent);
-
-        TabletResourceManager trm =
-            resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent));
-        TabletData data = new TabletData(tabletMetadata);
-
-        tablet = new Tablet(TabletServer.this, extent, trm, data);
-        // If a minor compaction starts after a tablet opens, this indicates a log recovery
-        // occurred. This recovered data must be minor compacted.
-        // There are three reasons to wait for this minor compaction to finish before placing the
-        // tablet in online tablets.
-        //
-        // 1) The log recovery code does not handle data written to the tablet on multiple tablet
-        // servers.
-        // 2) The log recovery code does not block if memory is full. Therefore recovering lots of
-        // tablets that use a lot of memory could run out of memory.
-        // 3) The minor compaction finish event did not make it to the logs (the file will be in
-        // metadata, preventing replay of compacted data)... but do not
-        // want a majc to wipe the file out from metadata and then have another process failure...
-        // this could cause duplicate data to replay.
-        if (tablet.getNumEntriesInMemory() > 0
-            && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
-          throw new RuntimeException("Minor compaction after recovery fails for " + extent);
-        }
-        Assignment assignment = new Assignment(extent, getTabletSession());
-        TabletStateStore.setLocation(getContext(), assignment);
-
-        synchronized (openingTablets) {
-          synchronized (onlineTablets) {
-            openingTablets.remove(extent);
-            onlineTablets.put(extent, tablet);
-            openingTablets.notifyAll();
-            recentlyUnloadedCache.remove(tablet.getExtent());
-          }
-        }
-        tablet = null; // release this reference
-        successful = true;
-      } catch (Throwable e) {
-        log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);
-
-        if (e.getMessage() != null) {
-          log.warn("{}", e.getMessage());
-        }
-
-        TableId tableId = extent.getTableId();
-        ProblemReports.getInstance(getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
-            extent.getUUID().toString(), getClientAddressString(), e));
-      } finally {
-        releaseRecoveryMemory(extent);
-      }
-
-      if (!successful) {
-        synchronized (unopenedTablets) {
-          synchronized (openingTablets) {
-            openingTablets.remove(extent);
-            unopenedTablets.add(extent);
-            openingTablets.notifyAll();
-          }
-        }
-        log.warn("failed to open tablet {} reporting failure to master", extent);
-        enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
-        long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
-        log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
-        SimpleTimer.getInstance(getConfiguration()).schedule(new TimerTask() {
-          @Override
-          public void run() {
-            log.info("adding tablet {} back to the assignment pool (retry {})", extent,
-                retryAttempt);
-            AssignmentHandler handler = new AssignmentHandler(extent, retryAttempt + 1);
-            if (extent.isMeta()) {
-              if (extent.isRootTablet()) {
-                new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry")
-                    .start();
-              } else {
-                resourceManager.addMetaDataAssignment(extent, log, handler);
-              }
-            } else {
-              resourceManager.addAssignment(extent, log, handler);
-            }
-          }
-        }, reschedule);
-      } else {
-        enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
-      }
-    }
-  }
-
-  private void acquireRecoveryMemory(KeyExtent extent) {
+  void acquireRecoveryMemory(KeyExtent extent) {
     if (!extent.isMeta()) {
       recoveryLock.lock();
     }
   }
 
-  private void releaseRecoveryMemory(KeyExtent extent) {
+  void releaseRecoveryMemory(KeyExtent extent) {
     if (!extent.isMeta()) {
       recoveryLock.unlock();
     }
@@ -2624,7 +586,7 @@ public class TabletServer extends AbstractServer {
 
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
-    clientHandler = new ThriftClientHandler();
+    clientHandler = new ThriftClientHandler(this);
     Iface rpcProxy = TraceUtil.wrapService(clientHandler);
     final Processor<Iface> processor;
     if (getContext().getThriftServerType() == ThriftServerType.SASL) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 41a748b..0c68658 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -78,7 +78,6 @@ import org.apache.accumulo.server.tabletserver.MemoryManager;
 import org.apache.accumulo.server.tabletserver.TabletState;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.tserver.FileManager.ScanFileManager;
-import org.apache.accumulo.tserver.TabletServer.AssignmentHandler;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
new file mode 100644
index 0000000..ba667d7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -0,0 +1,1792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.CompressedIterators;
+import org.apache.accumulo.core.clientImpl.DurabilityImpl;
+import org.apache.accumulo.core.clientImpl.Tables;
+import org.apache.accumulo.core.clientImpl.TabletType;
+import org.apache.accumulo.core.clientImpl.Translator;
+import org.apache.accumulo.core.clientImpl.Translator.TKeyExtentTranslator;
+import org.apache.accumulo.core.clientImpl.Translator.TRangeTranslator;
+import org.apache.accumulo.core.clientImpl.Translators;
+import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMStatus;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyValue;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.logging.TabletLogger;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.cache.BlockCache;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
+import org.apache.accumulo.core.summary.SummaryCollection;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
+import org.apache.accumulo.tserver.RowLocks.RowLock;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.accumulo.tserver.scan.LookupTask;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
+import org.apache.accumulo.tserver.scan.ScanParameters;
+import org.apache.accumulo.tserver.session.ConditionalSession;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.session.SummarySession;
+import org.apache.accumulo.tserver.session.UpdateSession;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.CompactionInfo;
+import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.PreparedMutations;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.collect.Collections2;
+
+class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+  private static final Logger log = LoggerFactory.getLogger(ThriftClientHandler.class);
+  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+  private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
+  private final TabletServer server;
+  private final WriteTracker writeTracker = new WriteTracker();
+  private final RowLocks rowLocks = new RowLocks();
+
+  ThriftClientHandler(TabletServer server) {
+    super(server.getContext(), new TransactionWatcher(server.getContext()), server.getFileSystem());
+    this.server = server;
+    log.debug("{} created", ThriftClientHandler.class.getName());
+  }
+
+  @Override
+  public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final long tid,
+      final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
+      throws ThriftSecurityException {
+
+    if (!security.canPerformSystemActions(credentials)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+
+    try {
+      return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
+        List<TKeyExtent> failures = new ArrayList<>();
+
+        for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+          TKeyExtent tke = entry.getKey();
+          Map<String,MapFileInfo> fileMap = entry.getValue();
+          Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
+          for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+            Path path = new Path(mapping.getKey());
+            FileSystem ns = fs.getFileSystemByPath(path);
+            path = ns.makeQualified(path);
+            fileRefMap.put(new TabletFile(path), mapping.getValue());
+          }
+
+          Tablet importTablet = server.getOnlineTablet(new KeyExtent(tke));
+
+          if (importTablet == null) {
+            failures.add(tke);
+          } else {
+            try {
+              importTablet.importMapFiles(tid, fileRefMap, setTime);
+            } catch (IOException ioe) {
+              log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
+                  ioe.getMessage());
+              failures.add(tke);
+            }
+          }
+        }
+        return failures;
+      });
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
+      Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
+      throws ThriftSecurityException {
+    if (!security.canPerformSystemActions(credentials)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+
+    transactionWatcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
+      tabletImports.forEach((tke, fileMap) -> {
+        Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
+        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+          Path path = new Path(dir, mapping.getKey());
+          FileSystem ns = fs.getFileSystemByPath(path);
+          path = ns.makeQualified(path);
+          newFileMap.put(new TabletFile(path), mapping.getValue());
+        }
+
+        Tablet importTablet = server.getOnlineTablet(new KeyExtent(tke));
+
+        if (importTablet != null) {
+          try {
+            importTablet.importMapFiles(tid, newFileMap, setTime);
+          } catch (IOException ioe) {
+            log.debug("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
+                ioe.getMessage());
+          }
+        }
+      });
+    });
+
+  }
+
+  private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+    if (extent.isRootTablet() || extent.isMeta()) {
+      // dispatcher is only for user tables
+      return null;
+    }
+
+    return server.getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
+  }
+
+  @Override
+  public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
+      TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+      boolean isolated, long readaheadThreshold, TSamplerConfiguration tSamplerConfig,
+      long batchTimeOut, String contextArg, Map<String,String> executionHints)
+      throws NotServingTabletException, ThriftSecurityException,
+      org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+      TSampleNotPresentException {
+
+    TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
+    NamespaceId namespaceId;
+    try {
+      namespaceId = Tables.getNamespaceId(server.getContext(), tableId);
+    } catch (TableNotFoundException e1) {
+      throw new NotServingTabletException(textent);
+    }
+    if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
+        authorizations)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+
+    if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.BAD_AUTHORIZATIONS);
+    }
+
+    final KeyExtent extent = new KeyExtent(textent);
+
+    // wait for any writes that are in flight.. this done to ensure
+    // consistency across client restarts... assume a client writes
+    // to accumulo and dies while waiting for a confirmation from
+    // accumulo... the client process restarts and tries to read
+    // data from accumulo making the assumption that it will get
+    // any writes previously made... however if the server side thread
+    // processing the write from the dead client is still in progress,
+    // the restarted client may not see the write unless we wait here.
+    // this behavior is very important when the client is reading the
+    // metadata
+    if (waitForWrites) {
+      writeTracker.waitForWrites(TabletType.type(extent));
+    }
+
+    Tablet tablet = server.getOnlineTablet(extent);
+    if (tablet == null) {
+      throw new NotServingTabletException(textent);
+    }
+
+    HashSet<Column> columnSet = new HashSet<>();
+    for (TColumn tcolumn : columns) {
+      columnSet.add(new Column(tcolumn));
+    }
+
+    ScanParameters scanParams = new ScanParameters(batchSize, new Authorizations(authorizations),
+        columnSet, ssiList, ssio, isolated, SamplerConfigurationImpl.fromThrift(tSamplerConfig),
+        batchTimeOut, contextArg);
+
+    final SingleScanSession scanSession =
+        new SingleScanSession(credentials, extent, scanParams, readaheadThreshold, executionHints);
+    scanSession.scanner =
+        tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag);
+
+    long sid = server.sessionManager.createSession(scanSession, true);
+
+    ScanResult scanResult;
+    try {
+      scanResult = continueScan(tinfo, sid, scanSession);
+    } catch (NoSuchScanIDException e) {
+      log.error("The impossible happened", e);
+      throw new RuntimeException();
+    } finally {
+      server.sessionManager.unreserveSession(sid);
+    }
+
+    return new InitialScan(sid, scanResult);
+  }
+
+  @Override
+  public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException,
+      NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+      TSampleNotPresentException {
+    SingleScanSession scanSession =
+        (SingleScanSession) server.sessionManager.reserveSession(scanID);
+    if (scanSession == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    try {
+      return continueScan(tinfo, scanID, scanSession);
+    } finally {
+      server.sessionManager.unreserveSession(scanSession);
+    }
+  }
+
+  private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
+      throws NoSuchScanIDException, NotServingTabletException,
+      org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+      TSampleNotPresentException {
+
+    if (scanSession.nextBatchTask == null) {
+      scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
+      server.resourceManager.executeReadAhead(scanSession.extent,
+          getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
+    }
+
+    ScanBatch bresult;
+    try {
+      bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
+          TimeUnit.MILLISECONDS);
+      scanSession.nextBatchTask = null;
+    } catch (ExecutionException e) {
+      server.sessionManager.removeSession(scanID);
+      if (e.getCause() instanceof NotServingTabletException) {
+        throw (NotServingTabletException) e.getCause();
+      } else if (e.getCause() instanceof TooManyFilesException) {
+        throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
+            scanSession.extent.toThrift());
+      } else if (e.getCause() instanceof SampleNotPresentException) {
+        throw new TSampleNotPresentException(scanSession.extent.toThrift());
+      } else if (e.getCause() instanceof IOException) {
+        sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+        List<KVEntry> empty = Collections.emptyList();
+        bresult = new ScanBatch(empty, true);
+        scanSession.nextBatchTask = null;
+      } else {
+        throw new RuntimeException(e);
+      }
+    } catch (CancellationException ce) {
+      server.sessionManager.removeSession(scanID);
+      Tablet tablet = server.getOnlineTablet(scanSession.extent);
+      if (tablet == null || tablet.isClosed()) {
+        throw new NotServingTabletException(scanSession.extent.toThrift());
+      } else {
+        throw new NoSuchScanIDException();
+      }
+    } catch (TimeoutException e) {
+      List<TKeyValue> param = Collections.emptyList();
+      long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+      server.sessionManager.removeIfNotAccessed(scanID, timeout);
+      return new ScanResult(param, true);
+    } catch (Throwable t) {
+      server.sessionManager.removeSession(scanID);
+      log.warn("Failed to get next batch", t);
+      throw new RuntimeException(t);
+    }
+
+    ScanResult scanResult = new ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
+
+    scanSession.entriesReturned += scanResult.results.size();
+
+    scanSession.batchCount++;
+
+    if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
+      // start reading next batch while current batch is transmitted
+      // to client
+      scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag);
+      server.resourceManager.executeReadAhead(scanSession.extent,
+          getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask);
+    }
+
+    if (!scanResult.more) {
+      closeScan(tinfo, scanID);
+    }
+
+    return scanResult;
+  }
+
+  @Override
+  public void closeScan(TInfo tinfo, long scanID) {
+    final SingleScanSession ss = (SingleScanSession) server.sessionManager.removeSession(scanID);
+    if (ss != null) {
+      long t2 = System.currentTimeMillis();
+
+      if (log.isTraceEnabled()) {
+        log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
+            TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
+            (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
+      }
+
+      server.scanMetrics.addScan(t2 - ss.startTime);
+      server.scanMetrics.addResult(ss.entriesReturned);
+    }
+  }
+
+  @Override
+  public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
+      Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+      TSamplerConfiguration tSamplerConfig, long batchTimeOut, String contextArg,
+      Map<String,String> executionHints)
+      throws ThriftSecurityException, TSampleNotPresentException {
+    // find all of the tables that need to be scanned
+    final HashSet<TableId> tables = new HashSet<>();
+    for (TKeyExtent keyExtent : tbatch.keySet()) {
+      tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
+    }
+
+    if (tables.size() != 1) {
+      throw new IllegalArgumentException("Cannot batch scan over multiple tables");
+    }
+
+    // check if user has permission to the tables
+    for (TableId tableId : tables) {
+      NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+      if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
+          authorizations)) {
+        throw new ThriftSecurityException(credentials.getPrincipal(),
+            SecurityErrorCode.PERMISSION_DENIED);
+      }
+    }
+
+    try {
+      if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
+        throw new ThriftSecurityException(credentials.getPrincipal(),
+            SecurityErrorCode.BAD_AUTHORIZATIONS);
+      }
+    } catch (ThriftSecurityException tse) {
+      log.error("{} is not authorized", credentials.getPrincipal(), tse);
+      throw tse;
+    }
+    Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(),
+        new Translator.ListTranslator<>(new TRangeTranslator()));
+
+    // This is used to determine which thread pool to use
+    KeyExtent threadPoolExtent = batch.keySet().iterator().next();
+
+    if (waitForWrites) {
+      writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+    }
+
+    Set<Column> columnSet = tcolumns.isEmpty() ? Collections.emptySet()
+        : new HashSet<>(Collections2.transform(tcolumns, Column::new));
+
+    ScanParameters scanParams =
+        new ScanParameters(-1, new Authorizations(authorizations), columnSet, ssiList, ssio, false,
+            SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut, contextArg);
+
+    final MultiScanSession mss =
+        new MultiScanSession(credentials, threadPoolExtent, batch, scanParams, executionHints);
+
+    mss.numTablets = batch.size();
+    for (List<Range> ranges : batch.values()) {
+      mss.numRanges += ranges.size();
+    }
+
+    long sid = server.sessionManager.createSession(mss, true);
+
+    MultiScanResult result;
+    try {
+      result = continueMultiScan(sid, mss);
+    } finally {
+      server.sessionManager.unreserveSession(sid);
+    }
+
+    return new InitialMultiScan(sid, result);
+  }
+
+  @Override
+  public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
+      throws NoSuchScanIDException, TSampleNotPresentException {
+
+    MultiScanSession session = (MultiScanSession) server.sessionManager.reserveSession(scanID);
+
+    if (session == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    try {
+      return continueMultiScan(scanID, session);
+    } finally {
+      server.sessionManager.unreserveSession(session);
+    }
+  }
+
+  private MultiScanResult continueMultiScan(long scanID, MultiScanSession session)
+      throws TSampleNotPresentException {
+
+    if (session.lookupTask == null) {
+      session.lookupTask = new LookupTask(server, scanID);
+      server.resourceManager.executeReadAhead(session.threadPoolExtent,
+          getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
+    }
+
+    try {
+      MultiScanResult scanResult =
+          session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+      session.lookupTask = null;
+      return scanResult;
+    } catch (ExecutionException e) {
+      server.sessionManager.removeSession(scanID);
+      if (e.getCause() instanceof SampleNotPresentException) {
+        throw new TSampleNotPresentException();
+      } else {
+        log.warn("Failed to get multiscan result", e);
+        throw new RuntimeException(e);
+      }
+    } catch (TimeoutException e1) {
+      long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+      server.sessionManager.removeIfNotAccessed(scanID, timeout);
+      List<TKeyValue> results = Collections.emptyList();
+      Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+      List<TKeyExtent> fullScans = Collections.emptyList();
+      return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+    } catch (Throwable t) {
+      server.sessionManager.removeSession(scanID);
+      log.warn("Failed to get multiscan result", t);
+      throw new RuntimeException(t);
+    }
+  }
+
+  @Override
+  public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+    MultiScanSession session = (MultiScanSession) server.sessionManager.removeSession(scanID);
+    if (session == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    long t2 = System.currentTimeMillis();
+
+    if (log.isTraceEnabled()) {
+      log.trace(String.format(
+          "MultiScanSess %s %,d entries in %.2f secs"
+              + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
+          TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0,
+          session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+    }
+  }
+
+  @Override
+  public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
+      throws ThriftSecurityException {
+    // Make sure user is real
+    Durability durability = DurabilityImpl.fromThrift(tdurabilty);
+    security.authenticateUser(credentials, credentials);
+    server.updateMetrics.addPermissionErrors(0);
+
+    UpdateSession us =
+        new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials),
+            credentials, durability);
+    return server.sessionManager.createSession(us, false);
+  }
+
+  private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+    long t1 = System.currentTimeMillis();
+    if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
+      return;
+    }
+    if (us.currentTablet == null
+        && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+      // if there were previous failures, then do not accept additional writes
+      return;
+    }
+
+    TableId tableId = null;
+    try {
+      // if user has no permission to write to this table, add it to
+      // the failures list
+      boolean sameTable = us.currentTablet != null
+          && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+      tableId = keyExtent.getTableId();
+      if (sameTable || security.canWrite(us.getCredentials(), tableId,
+          Tables.getNamespaceId(server.getContext(), tableId))) {
+        long t2 = System.currentTimeMillis();
+        us.authTimes.addStat(t2 - t1);
+        us.currentTablet = server.getOnlineTablet(keyExtent);
+        if (us.currentTablet != null) {
+          us.queuedMutations.put(us.currentTablet, new ArrayList<>());
+        } else {
+          // not serving tablet, so report all mutations as
+          // failures
+          us.failures.put(keyExtent, 0L);
+          server.updateMetrics.addUnknownTabletErrors(0);
+        }
+      } else {
+        log.warn("Denying access to table {} for user {}", keyExtent.getTableId(), us.getUser());
+        long t2 = System.currentTimeMillis();
+        us.authTimes.addStat(t2 - t1);
+        us.currentTablet = null;
+        us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+        server.updateMetrics.addPermissionErrors(0);
+        return;
+      }
+    } catch (TableNotFoundException tnfe) {
+      log.error("Table " + tableId + " not found ", tnfe);
+      long t2 = System.currentTimeMillis();
+      us.authTimes.addStat(t2 - t1);
+      us.currentTablet = null;
+      us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
+      server.updateMetrics.addUnknownTabletErrors(0);
+      return;
+    } catch (ThriftSecurityException e) {
+      log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
+          e);
+      long t2 = System.currentTimeMillis();
+      us.authTimes.addStat(t2 - t1);
+      us.currentTablet = null;
+      us.authFailures.put(keyExtent, e.getCode());
+      server.updateMetrics.addPermissionErrors(0);
+      return;
+    }
+  }
+
+  @Override
+  public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
+      List<TMutation> tmutations) {
+    UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID);
+    if (us == null) {
+      return;
+    }
+
+    boolean reserved = true;
+    try {
+      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+      setUpdateTablet(us, keyExtent);
+
+      if (us.currentTablet != null) {
+        long additionalMutationSize = 0;
+        List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+        for (TMutation tmutation : tmutations) {
+          Mutation mutation = new ServerMutation(tmutation);
+          mutations.add(mutation);
+          additionalMutationSize += mutation.numBytes();
+        }
+        us.queuedMutationSize += additionalMutationSize;
+        long totalQueued = server.updateTotalQueuedMutationSize(additionalMutationSize);
+        long total = server.getConfiguration().getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
+        if (totalQueued > total) {
+          try {
+            flush(us);
+          } catch (HoldTimeoutException hte) {
+            // Assumption is that the client has timed out and is gone. If that's not the case,
+            // then removing the session should cause the client to fail
+            // in such a way that it retries.
+            log.debug("HoldTimeoutException during applyUpdates, removing session");
+            server.sessionManager.removeSession(updateID, true);
+            reserved = false;
+          }
+        }
+      }
+    } finally {
+      if (reserved) {
+        server.sessionManager.unreserveSession(us);
+      }
+    }
+  }
+
+  private void flush(UpdateSession us) {
+
+    int mutationCount = 0;
+    Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+    Map<CommitSession,TabletMutations> loggables = new HashMap<>();
+    Throwable error = null;
+
+    long pt1 = System.currentTimeMillis();
+
+    boolean containsMetadataTablet = false;
+    for (Tablet tablet : us.queuedMutations.keySet()) {
+      if (tablet.getExtent().isMeta()) {
+        containsMetadataTablet = true;
+      }
+    }
+
+    if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
+      server.resourceManager.waitUntilCommitsAreEnabled();
+    }
+
+    try (TraceScope prep = Trace.startSpan("prep")) {
+      for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+
+        Tablet tablet = entry.getKey();
+        Durability durability =
+            DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
+        List<Mutation> mutations = entry.getValue();
+        if (mutations.size() > 0) {
+          try {
+            server.updateMetrics.addMutationArraySize(mutations.size());
+
+            PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);
+
+            if (prepared.tabletClosed()) {
+              if (us.currentTablet == tablet) {
+                us.currentTablet = null;
+              }
+              us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+            } else {
+              if (!prepared.getNonViolators().isEmpty()) {
+                List<Mutation> validMutations = prepared.getNonViolators();
+                CommitSession session = prepared.getCommitSession();
+                if (durability != Durability.NONE) {
+                  loggables.put(session, new TabletMutations(session, validMutations, durability));
+                }
+                sendables.put(session, validMutations);
+              }
+
+              if (!prepared.getViolations().isEmpty()) {
+                us.violations.add(prepared.getViolations());
+                server.updateMetrics.addConstraintViolations(0);
+              }
+              // Use the size of the original mutation list, regardless of how many mutations
+              // did not violate constraints.
+              mutationCount += mutations.size();
+
+            }
+          } catch (Throwable t) {
+            error = t;
+            log.error("Unexpected error preparing for commit", error);
+            break;
+          }
+        }
+      }
+    }
+
+    long pt2 = System.currentTimeMillis();
+    us.prepareTimes.addStat(pt2 - pt1);
+    updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
+
+    if (error != null) {
+      sendables.forEach((commitSession, value) -> commitSession.abortCommit());
+      throw new RuntimeException(error);
+    }
+    try {
+      try (TraceScope wal = Trace.startSpan("wal")) {
+        while (true) {
+          try {
+            long t1 = System.currentTimeMillis();
+
+            server.logger.logManyTablets(loggables);
+
+            long t2 = System.currentTimeMillis();
+            us.walogTimes.addStat(t2 - t1);
+            updateWalogWriteTime((t2 - t1));
+            break;
+          } catch (IOException | FSError ex) {
+            log.warn("logging mutations failed, retrying");
+          } catch (Throwable t) {
+            log.error("Unknown exception logging mutations, counts"
+                + " for mutations in flight not decremented!", t);
+            throw new RuntimeException(t);
+          }
+        }
+      }
+
+      try (TraceScope commit = Trace.startSpan("commit")) {
+        long t1 = System.currentTimeMillis();
+        sendables.forEach((commitSession, mutations) -> {
+          commitSession.commit(mutations);
+          KeyExtent extent = commitSession.getExtent();
+
+          if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
+            // because constraint violations may filter out some
+            // mutations, for proper accounting with the client code,
+            // need to increment the count based on the original
+            // number of mutations from the client NOT the filtered number
+            us.successfulCommits.increment(us.currentTablet,
+                us.queuedMutations.get(us.currentTablet).size());
+          }
+        });
+        long t2 = System.currentTimeMillis();
+
+        us.flushTime += (t2 - pt1);
+        us.commitTimes.addStat(t2 - t1);
+
+        updateAvgCommitTime(t2 - t1, sendables.size());
+      }
+    } finally {
+      us.queuedMutations.clear();
+      if (us.currentTablet != null) {
+        us.queuedMutations.put(us.currentTablet, new ArrayList<>());
+      }
+      server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
+      us.queuedMutationSize = 0;
+    }
+    us.totalUpdates += mutationCount;
+  }
+
+  private void updateWalogWriteTime(long time) {
+    server.updateMetrics.addWalogWriteTime(time);
+  }
+
+  private void updateAvgCommitTime(long time, int size) {
+    if (size > 0)
+      server.updateMetrics.addCommitTime((long) (time / (double) size));
+  }
+
+  private void updateAvgPrepTime(long time, int size) {
+    if (size > 0)
+      server.updateMetrics.addCommitPrep((long) (time / (double) size));
+  }
+
+  @Override
+  public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+    final UpdateSession us = (UpdateSession) server.sessionManager.removeSession(updateID);
+    if (us == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    // clients may or may not see data from an update session while
+    // it is in progress, however when the update session is closed
+    // want to ensure that reads wait for the write to finish
+    long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+    try {
+      flush(us);
+    } catch (HoldTimeoutException e) {
+      // Assumption is that the client has timed out and is gone. If that's not the case throw an
+      // exception that will cause it to retry.
+      log.debug("HoldTimeoutException during closeUpdate, reporting no such session");
+      throw new NoSuchScanIDException();
+    } finally {
+      writeTracker.finishWrite(opid);
+    }
+
+    if (log.isTraceEnabled()) {
+      log.trace(
+          String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
+              TServerUtils.clientAddress.get(), us.totalUpdates,
+              (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
+              us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
+              us.commitTimes.sum() / 1000.0));
+    }
+    if (us.failures.size() > 0) {
+      Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+      log.debug(String.format("Failures: %d, first extent %s successful commits: %d",
+          us.failures.size(), first.getKey().toString(), first.getValue()));
+    }
+    List<ConstraintViolationSummary> violations = us.violations.asList();
+    if (violations.size() > 0) {
+      ConstraintViolationSummary first = us.violations.asList().iterator().next();
+      log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(),
+          first.violationDescription, first.numberOfViolatingMutations));
+    }
+    if (us.authFailures.size() > 0) {
+      KeyExtent first = us.authFailures.keySet().iterator().next();
+      log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(),
+          first.toString()));
+    }
+
+    return new UpdateErrors(Translator.translate(us.failures, Translators.KET),
+        Translator.translate(violations, Translators.CVST),
+        Translator.translate(us.authFailures, Translators.KET));
+  }
+
+  @Override
+  public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
+      TMutation tmutation, TDurability tdurability)
+      throws NotServingTabletException, ConstraintViolationException, ThriftSecurityException {
+
+    final TableId tableId = TableId.of(new String(tkeyExtent.getTable(), UTF_8));
+    NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+    if (!security.canWrite(credentials, tableId, namespaceId)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+    final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+    final Tablet tablet = server.getOnlineTablet(new KeyExtent(keyExtent));
+    if (tablet == null) {
+      throw new NotServingTabletException(tkeyExtent);
+    }
+    Durability tabletDurability = tablet.getDurability();
+
+    if (!keyExtent.isMeta()) {
+      try {
+        server.resourceManager.waitUntilCommitsAreEnabled();
+      } catch (HoldTimeoutException hte) {
+        // Major hack. Assumption is that the client has timed out and is gone. If that's not the
+        // case, then throwing the following will let client know there
+        // was a failure and it should retry.
+        throw new NotServingTabletException(tkeyExtent);
+      }
+    }
+
+    final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+
+    try {
+      final Mutation mutation = new ServerMutation(tmutation);
+      final List<Mutation> mutations = Collections.singletonList(mutation);
+
+      PreparedMutations prepared;
+      try (TraceScope prep = Trace.startSpan("prep")) {
+        prepared = tablet.prepareMutationsForCommit(
+            new TservConstraintEnv(server.getContext(), security, credentials), mutations);
+      }
+
+      if (prepared.tabletClosed()) {
+        throw new NotServingTabletException(tkeyExtent);
+      } else if (!prepared.getViolators().isEmpty()) {
+        throw new ConstraintViolationException(
+            Translator.translate(prepared.getViolations().asList(), Translators.CVST));
+      } else {
+        CommitSession session = prepared.getCommitSession();
+        Durability durability = DurabilityImpl
+            .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
+
+        // Instead of always looping on true, skip completely when durability is NONE.
+        while (durability != Durability.NONE) {
+          try {
+            try (TraceScope wal = Trace.startSpan("wal")) {
+              server.logger.log(session, mutation, durability);
+            }
+            break;
+          } catch (IOException ex) {
+            log.warn("Error writing mutations to log", ex);
+          }
+        }
+
+        try (TraceScope commit = Trace.startSpan("commit")) {
+          session.commit(mutations);
+        }
+      }
+    } finally {
+      writeTracker.finishWrite(opid);
+    }
+  }
+
+  private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
+      throws ThriftSecurityException {
+    try {
+      return Tables.getNamespaceId(server.getContext(), tableId);
+    } catch (TableNotFoundException e1) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.TABLE_DOESNT_EXIST);
+    }
+  }
+
+  private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
+      ArrayList<TCMResult> results, ConditionalSession cs, List<String> symbols)
+      throws IOException {
+    Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+
+    final CompressedIterators compressedIters = new CompressedIterators(symbols);
+    ConditionCheckerContext checkerContext = new ConditionCheckerContext(server.getContext(),
+        compressedIters, server.getContext().getTableConfiguration(cs.tableId));
+
+    while (iter.hasNext()) {
+      final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+      final Tablet tablet = server.getOnlineTablet(entry.getKey());
+
+      if (tablet == null || tablet.isClosed()) {
+        for (ServerConditionalMutation scm : entry.getValue()) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+        }
+        iter.remove();
+      } else {
+        final List<ServerConditionalMutation> okMutations =
+            new ArrayList<>(entry.getValue().size());
+        final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
+
+        ConditionChecker checker =
+            checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
+        try {
+          tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
+
+          if (okMutations.size() > 0) {
+            entry.setValue(okMutations);
+          } else {
+            iter.remove();
+          }
+        } catch (TabletClosedException | IterationInterruptedException | TooManyFilesException e) {
+          // clear anything added while checking conditions.
+          resultsSubList.clear();
+
+          for (ServerConditionalMutation scm : entry.getValue()) {
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          }
+          iter.remove();
+        }
+      }
+    }
+  }
+
+  private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates,
+      ArrayList<TCMResult> results, ConditionalSession sess) {
+    Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+
+    Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+    Map<CommitSession,TabletMutations> loggables = new HashMap<>();
+
+    boolean sessionCanceled = sess.interruptFlag.get();
+
+    try (TraceScope prepSpan = Trace.startSpan("prep")) {
+      long t1 = System.currentTimeMillis();
+      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+        final Tablet tablet = server.getOnlineTablet(entry.getKey());
+        if (tablet == null || tablet.isClosed() || sessionCanceled) {
+          addMutationsAsTCMResults(results, entry.getValue(), TCMStatus.IGNORED);
+        } else {
+          final Durability durability =
+              DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability());
+
+          @SuppressWarnings("unchecked")
+          List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+          if (!mutations.isEmpty()) {
+
+            PreparedMutations prepared = tablet.prepareMutationsForCommit(
+                new TservConstraintEnv(server.getContext(), security, sess.credentials), mutations);
+
+            if (prepared.tabletClosed()) {
+              addMutationsAsTCMResults(results, mutations, TCMStatus.IGNORED);
+            } else {
+              if (!prepared.getNonViolators().isEmpty()) {
+                // Only log and commit mutations that did not violate constraints.
+                List<Mutation> validMutations = prepared.getNonViolators();
+                addMutationsAsTCMResults(results, validMutations, TCMStatus.ACCEPTED);
+                CommitSession session = prepared.getCommitSession();
+                if (durability != Durability.NONE) {
+                  loggables.put(session, new TabletMutations(session, validMutations, durability));
+                }
+                sendables.put(session, validMutations);
+              }
+
+              if (!prepared.getViolators().isEmpty()) {
+                addMutationsAsTCMResults(results, prepared.getViolators(), TCMStatus.VIOLATED);
+              }
+            }
+          }
+        }
+      }
+
+      long t2 = System.currentTimeMillis();
+      updateAvgPrepTime(t2 - t1, es.size());
+    }
+
+    try (TraceScope walSpan = Trace.startSpan("wal")) {
+      while (loggables.size() > 0) {
+        try {
+          long t1 = System.currentTimeMillis();
+          server.logger.logManyTablets(loggables);
+          long t2 = System.currentTimeMillis();
+          updateWalogWriteTime(t2 - t1);
+          break;
+        } catch (IOException | FSError ex) {
+          log.warn("logging mutations failed, retrying");
+        } catch (Throwable t) {
+          log.error("Unknown exception logging mutations, counts for"
+              + " mutations in flight not decremented!", t);
+          throw new RuntimeException(t);
+        }
+      }
+    }
+
+    try (TraceScope commitSpan = Trace.startSpan("commit")) {
+      long t1 = System.currentTimeMillis();
+      sendables.forEach(CommitSession::commit);
+      long t2 = System.currentTimeMillis();
+      updateAvgCommitTime(t2 - t1, sendables.size());
+    }
+  }
+
+  /**
+   * Transform and add each mutation as a {@link TCMResult} with the mutation's ID and the specified
+   * status to the {@link TCMResult} list.
+   */
+  private void addMutationsAsTCMResults(final List<TCMResult> list,
+      final Collection<? extends Mutation> mutations, final TCMStatus status) {
+    mutations.stream()
+        .map(mutation -> new TCMResult(((ServerConditionalMutation) mutation).getID(), status))
+        .forEach(list::add);
+  }
+
+  private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs,
+      Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results,
+      List<String> symbols) throws IOException {
+    // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is
+    // more efficient and detect duplicate rows.
+    ConditionalMutationSet.sortConditionalMutations(updates);
+
+    Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
+
+    // can not process two mutations for the same row, because one will not see what the other
+    // writes
+    ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+    // get as many locks as possible w/o blocking... defer any rows that are locked
+    List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+    try {
+      try (TraceScope checkSpan = Trace.startSpan("Check conditions")) {
+        checkConditions(updates, results, cs, symbols);
+      }
+
+      try (TraceScope updateSpan = Trace.startSpan("apply conditional mutations")) {
+        writeConditionalMutations(updates, results, cs);
+      }
+    } finally {
+      rowLocks.releaseRowLocks(locks);
+    }
+    return deferred;
+  }
+
+  @Override
+  public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
+      List<ByteBuffer> authorizations, String tableIdStr, TDurability tdurabilty,
+      String classLoaderContext) throws ThriftSecurityException, TException {
+
+    TableId tableId = TableId.of(tableIdStr);
+    Authorizations userauths = null;
+    NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+    if (!security.canConditionallyUpdate(credentials, tableId, namespaceId)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+
+    userauths = security.getUserAuthorizations(credentials);
+    for (ByteBuffer auth : authorizations) {
+      if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
+        throw new ThriftSecurityException(credentials.getPrincipal(),
+            SecurityErrorCode.BAD_AUTHORIZATIONS);
+      }
+    }
+
+    ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations),
+        tableId, DurabilityImpl.fromThrift(tdurabilty));
+
+    long sid = server.sessionManager.createSession(cs, false);
+    return new TConditionalSession(sid, server.getLockID(), server.sessionManager.getMaxIdleTime());
+  }
+
+  @Override
+  public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
+      Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+      throws NoSuchScanIDException, TException {
+
+    ConditionalSession cs = (ConditionalSession) server.sessionManager.reserveSession(sessID);
+
+    if (cs == null || cs.interruptFlag.get()) {
+      throw new NoSuchScanIDException();
+    }
+
+    if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
+      try {
+        server.resourceManager.waitUntilCommitsAreEnabled();
+      } catch (HoldTimeoutException hte) {
+        // Assumption is that the client has timed out and is gone. If that's not the case throw
+        // an exception that will cause it to retry.
+        log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session");
+        throw new NoSuchScanIDException();
+      }
+    }
+
+    TableId tid = cs.tableId;
+    long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+
+    try {
+      Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
+          Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
+
+      for (KeyExtent ke : updates.keySet()) {
+        if (!ke.getTableId().equals(tid)) {
+          throw new IllegalArgumentException(
+              "Unexpected table id " + tid + " != " + ke.getTableId());
+        }
+      }
+
+      ArrayList<TCMResult> results = new ArrayList<>();
+
+      Map<KeyExtent,List<ServerConditionalMutation>> deferred =
+          conditionalUpdate(cs, updates, results, symbols);
+
+      while (deferred.size() > 0) {
+        deferred = conditionalUpdate(cs, deferred, results, symbols);
+      }
+
+      return results;
+    } catch (IOException ioe) {
+      throw new TException(ioe);
+    } finally {
+      writeTracker.finishWrite(opid);
+      server.sessionManager.unreserveSession(sessID);
+    }
+  }
+
+  @Override
+  public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
+    // this method should wait for any running conditional update to complete
+    // after this method returns a conditional update should not be able to start
+
+    ConditionalSession cs = (ConditionalSession) server.sessionManager.getSession(sessID);
+    if (cs != null) {
+      cs.interruptFlag.set(true);
+    }
+
+    cs = (ConditionalSession) server.sessionManager.reserveSession(sessID, true);
+    if (cs != null) {
+      server.sessionManager.removeSession(sessID, true);
+    }
+  }
+
+  @Override
+  public void closeConditionalUpdate(TInfo tinfo, long sessID) {
+    server.sessionManager.removeSession(sessID, false);
+  }
+
+  @Override
+  public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
+      ByteBuffer splitPoint) throws NotServingTabletException, ThriftSecurityException {
+
+    TableId tableId = TableId.of(new String(ByteBufferUtil.toBytes(tkeyExtent.table)));
+    NamespaceId namespaceId = getNamespaceId(credentials, tableId);
+
+    if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
+      throw new ThriftSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED);
+    }
+
+    KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+
+    Tablet tablet = server.getOnlineTablet(keyExtent);
+    if (tablet == null) {
+      throw new NotServingTabletException(tkeyExtent);
+    }
+
+    if (keyExtent.getEndRow() == null
+        || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+      try {
+        if (server.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+          throw new NotServingTabletException(tkeyExtent);
+        }
+      } catch (IOException e) {
+        log.warn("Failed to split " + keyExtent, e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) {
+    return server.getStats(server.sessionManager.getActiveScansPerTable());
+  }
+
+  @Override
+  public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) {
+    List<TabletStats> result = new ArrayList<>();
+    TableId text = TableId.of(tableId);
+    KeyExtent start = new KeyExtent(text, new Text(), null);
+    for (Entry<KeyExtent,Tablet> entry : server.getOnlineTablets().tailMap(start).entrySet()) {
+      KeyExtent ke = entry.getKey();
+      if (ke.getTableId().compareTo(text) == 0) {
+        Tablet tablet = entry.getValue();
+        TabletStats stats = tablet.getTabletStats();
+        stats.extent = ke.toThrift();
+        stats.ingestRate = tablet.ingestRate();
+        stats.queryRate = tablet.queryRate();
+        stats.splitCreationTime = tablet.getSplitCreationTime();
+        stats.numEntries = tablet.getNumEntries();
+        result.add(stats);
+      }
+    }
+    return result;
+  }
+
+  private void checkPermission(TCredentials credentials, String lock, final String request)
+      throws ThriftSecurityException {
+    try {
+      log.trace("Got {} message from user: {}", request, credentials.getPrincipal());
+      if (!security.canPerformSystemActions(credentials)) {
+        log.warn("Got {} message from user: {}", request, credentials.getPrincipal());
+        throw new ThriftSecurityException(credentials.getPrincipal(),
+            SecurityErrorCode.PERMISSION_DENIED);
+      }
+    } catch (ThriftSecurityException e) {
+      log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
+      if (server.getContext().getCredentials().getToken().getClass().getName()
+          .equals(credentials.getTokenClassName())) {
+        log.error("Got message from a service with a mismatched configuration."
+            + " Please ensure a compatible configuration.", e);
+      }
+      throw e;
+    }
+
+    if (server.getLock() == null || !server.getLock().wasLockAcquired()) {
+      log.debug("Got {} message before my lock was acquired, ignoring...", request);
+      throw new RuntimeException("Lock not acquired");
+    }
+
+    if (server.getLock() != null && server.getLock().wasLockAcquired()
+        && !server.getLock().isLocked()) {
+      Halt.halt(1, () -> {
+        log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
+            request);
+        server.gcLogger.logGCInfo(server.getConfiguration());
+      });
+    }
+
+    if (lock != null) {
+      ZooUtil.LockID lid =
+          new ZooUtil.LockID(server.getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
+
+      try {
+        if (!ZooLock.isLockHeld(server.masterLockCache, lid)) {
+          // maybe the cache is out of date and a new master holds the
+          // lock?
+          server.masterLockCache.clear();
+          if (!ZooLock.isLockHeld(server.masterLockCache, lid)) {
+            log.warn("Got {} message from a master that does not hold the current lock {}", request,
+                lock);
+            throw new RuntimeException("bad master lock");
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("bad master lock", e);
+      }
+    }
+  }
+
+  @Override
+  public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
+      final TKeyExtent textent) {
+
+    try {
+      checkPermission(credentials, lock, "loadTablet");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to load a tablet", e);
+      throw new RuntimeException(e);
+    }
+
+    final KeyExtent extent = new KeyExtent(textent);
+
+    synchronized (server.unopenedTablets) {
+      synchronized (server.openingTablets) {
+        synchronized (server.onlineTablets) {
+
+          // checking if this exact tablet is in any of the sets
+          // below is not a strong enough check
+          // when splits and fix splits occurring
+
+          Set<KeyExtent> unopenedOverlapping =
+              KeyExtent.findOverlapping(extent, server.unopenedTablets);
+          Set<KeyExtent> openingOverlapping =
+              KeyExtent.findOverlapping(extent, server.openingTablets);
+          Set<KeyExtent> onlineOverlapping =
+              KeyExtent.findOverlapping(extent, server.getOnlineTablets());
+
+          Set<KeyExtent> all = new HashSet<>();
+          all.addAll(unopenedOverlapping);
+          all.addAll(openingOverlapping);
+          all.addAll(onlineOverlapping);
+
+          if (!all.isEmpty()) {
+
+            // ignore any tablets that have recently split, for error logging
+            for (KeyExtent e2 : onlineOverlapping) {
+              Tablet tablet = server.getOnlineTablet(e2);
+              if (System.currentTimeMillis() - tablet.getSplitCreationTime()
+                  < RECENTLY_SPLIT_MILLIES) {
+                all.remove(e2);
+              }
+            }
+
+            // ignore self, for error logging
+            all.remove(extent);
+
+            if (all.size() > 0) {
+              log.error("Tablet {} overlaps previously assigned {} {} {}", extent,
+                  unopenedOverlapping, openingOverlapping, onlineOverlapping + " " + all);
+            }
+            return;
+          }
+
+          server.unopenedTablets.add(extent);
+        }
+      }
+    }
+
+    TabletLogger.loading(extent, server.getTabletSession());
+
+    final AssignmentHandler ah = new AssignmentHandler(server, extent);
+    // final Runnable ah = new LoggingRunnable(log, );
+    // Root tablet assignment must take place immediately
+
+    if (extent.isRootTablet()) {
+      new Daemon("Root Tablet Assignment") {
+        @Override
+        public void run() {
+          ah.run();
+          if (server.getOnlineTablets().containsKey(extent)) {
+            log.info("Root tablet loaded: {}", extent);
+          } else {
+            log.info("Root tablet failed to load");
+          }
+
+        }
+      }.start();
+    } else {
+      if (extent.isMeta()) {
+        server.resourceManager.addMetaDataAssignment(extent, log, ah);
+      } else {
+        server.resourceManager.addAssignment(extent, log, ah);
+      }
+    }
+  }
+
+  @Override
+  public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent,
+      TUnloadTabletGoal goal, long requestTime) {
+    try {
+      checkPermission(credentials, lock, "unloadTablet");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to unload a tablet", e);
+      throw new RuntimeException(e);
+    }
+
+    KeyExtent extent = new KeyExtent(textent);
+
+    server.resourceManager.addMigration(extent,
+        new LoggingRunnable(log, new UnloadTabletHandler(server, extent, goal, requestTime)));
+  }
+
+  @Override
+  public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId,
+      ByteBuffer startRow, ByteBuffer endRow) {
+    try {
+      checkPermission(credentials, lock, "flush");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to flush a table", e);
+      throw new RuntimeException(e);
+    }
+
+    ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
+
+    KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
+        ByteBufferUtil.toText(startRow));
+
+    for (Tablet tablet : server.getOnlineTablets().values()) {
+      if (ke.overlaps(tablet.getExtent())) {
+        tabletsToFlush.add(tablet);
+      }
+    }
+
+    Long flushID = null;
+
+    for (Tablet tablet : tabletsToFlush) {
+      if (flushID == null) {
+        // read the flush id once from zookeeper instead of reading
+        // it for each tablet
+        try {
+          flushID = tablet.getFlushID();
+        } catch (NoNodeException e) {
+          // table was probably deleted
+          log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
+          return;
+        }
+      }
+      tablet.flush(flushID);
+    }
+  }
+
+  @Override
+  public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
+    try {
+      checkPermission(credentials, lock, "flushTablet");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to flush a tablet", e);
+      throw new RuntimeException(e);
+    }
+
+    Tablet tablet = server.getOnlineTablet(new KeyExtent(textent));
+    if (tablet != null) {
+      log.info("Flushing {}", tablet.getExtent());
+      try {
+        tablet.flush(tablet.getFlushID());
+      } catch (NoNodeException nne) {
+        log.info("Asked to flush tablet that has no flush id {} {}", new KeyExtent(textent),
+            nne.getMessage());
+      }
+    }
+  }
+
+  @Override
+  public void halt(TInfo tinfo, TCredentials credentials, String lock)
+      throws ThriftSecurityException {
+
+    checkPermission(credentials, lock, "halt");
+
+    Halt.halt(0, () -> {
+      log.info("Master requested tablet server halt");
+      server.gcLogger.logGCInfo(server.getConfiguration());
+      server.requestStop();
+      try {
+        server.getLock().unlock();
+      } catch (Exception e) {
+        log.error("Caught exception unlocking TabletServer lock", e);
+      }
+    });
+  }
+
+  @Override
+  public void fastHalt(TInfo info, TCredentials credentials, String lock) {
+    try {
+      halt(info, credentials, lock);
+    } catch (Exception e) {
+      log.warn("Error halting", e);
+    }
+  }
+
+  @Override
+  public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) {
+    return server.statsKeeper.getTabletStats();
+  }
+
+  @Override
+  public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials)
+      throws ThriftSecurityException, TException {
+    try {
+      checkPermission(credentials, null, "getScans");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to get active scans", e);
+      throw e;
+    }
+
+    return server.sessionManager.getActiveScans();
+  }
+
+  @Override
+  public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) {
+    try {
+      checkPermission(credentials, lock, "chop");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to chop extent", e);
+      throw new RuntimeException(e);
+    }
+
+    KeyExtent ke = new KeyExtent(textent);
+
+    Tablet tablet = server.getOnlineTablet(ke);
+    if (tablet != null) {
+      tablet.chopFiles();
+    }
+  }
+
+  @Override
+  public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId,
+      ByteBuffer startRow, ByteBuffer endRow) {
+    try {
+      checkPermission(credentials, lock, "compact");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to compact a table", e);
+      throw new RuntimeException(e);
+    }
+
+    KeyExtent ke = new KeyExtent(TableId.of(tableId), ByteBufferUtil.toText(endRow),
+        ByteBufferUtil.toText(startRow));
+
+    ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
+
+    for (Tablet tablet : server.getOnlineTablets().values()) {
+      if (ke.overlaps(tablet.getExtent())) {
+        tabletsToCompact.add(tablet);
+      }
+    }
+
+    Pair<Long,UserCompactionConfig> compactionInfo = null;
+
+    for (Tablet tablet : tabletsToCompact) {
+      // all for the same table id, so only need to read
+      // compaction id once
+      if (compactionInfo == null) {
+        try {
+          compactionInfo = tablet.getCompactionID();
+        } catch (NoNodeException e) {
+          log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
+          return;
+        }
+      }
+      tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
+    }
+
+  }
+
+  @Override
+  public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials)
+      throws ThriftSecurityException, TException {
+    try {
+      checkPermission(credentials, null, "getActiveCompactions");
+    } catch (ThriftSecurityException e) {
+      log.error("Caller doesn't have permission to get active compactions", e);
+      throw e;
+    }
+
+    List<CompactionInfo> compactions = Compactor.getRunningCompactions();
+    List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
+
+    for (CompactionInfo compactionInfo : compactions) {
+      ret.add(compactionInfo.toThrift());
+    }
+
+    return ret;
+  }
+
+  @Override
+  public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) {
+    String log = server.logger.getLogFile();
+    // Might be null if there no active logger
+    if (log == null) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(log);
+  }
+
+  @Override
+  public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) {
+    log.warn("Garbage collector is attempting to remove logs through the tablet server");
+    log.warn("This is probably because your file"
+        + " Garbage Collector is an older version than your tablet servers.\n"
+        + "Restart your file Garbage Collector.");
+  }
+
+  private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
+    try {
+      SummaryCollection sc =
+          future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+      return sc.toThrift();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private TSummaries handleTimeout(long sessionId) {
+    long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+    server.sessionManager.removeIfNotAccessed(sessionId, timeout);
+    return new TSummaries(false, sessionId, -1, -1, null);
+  }
+
+  private TSummaries startSummaryOperation(TCredentials credentials,
+      Future<SummaryCollection> future) {
+    try {
+      return getSummaries(future);
+    } catch (TimeoutException e) {
+      long sid =
+          server.sessionManager.createSession(new SummarySession(credentials, future), false);
+      while (sid == 0) {
+        server.sessionManager.removeSession(sid);
+        sid = server.sessionManager.createSession(new SummarySession(credentials, future), false);
+      }
+      return handleTimeout(sid);
+    }
+  }
+
+  @Override
+  public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials,
+      TSummaryRequest request)
+      throws ThriftSecurityException, ThriftTableOperationException, TException {
+    NamespaceId namespaceId;
+    TableId tableId = TableId.of(request.getTableId());
+    try {
+      namespaceId = Tables.getNamespaceId(server.getContext(), tableId);
+    } catch (TableNotFoundException e1) {
+      throw new ThriftTableOperationException(tableId.canonical(), null, null,
+          TableOperationExceptionType.NOTFOUND, null);
+    }
+
+    if (!security.canGetSummaries(credentials, tableId, namespaceId)) {
+      throw new AccumuloSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+    }
+
+    ExecutorService es = server.resourceManager.getSummaryPartitionExecutor();
+    Future<SummaryCollection> future = new Gatherer(server.getContext(), request,
+        server.getContext().getTableConfiguration(tableId), server.getContext().getCryptoService())
+            .gather(es);
+
+    return startSummaryOperation(credentials, future);
+  }
+
+  @Override
+  public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials,
+      TSummaryRequest request, int modulus, int remainder)
+      throws ThriftSecurityException, TException {
+    // do not expect users to call this directly, expect other tservers to call this method
+    if (!security.canPerformSystemActions(credentials)) {
+      throw new AccumuloSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+    }
+
+    ExecutorService spe = server.resourceManager.getSummaryRemoteExecutor();
+    TableConfiguration tableConfig =
+        server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
+    Future<SummaryCollection> future = new Gatherer(server.getContext(), request, tableConfig,
+        server.getContext().getCryptoService()).processPartition(spe, modulus, remainder);
+
+    return startSummaryOperation(credentials, future);
+  }
+
+  @Override
+  public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials,
+      TSummaryRequest request, Map<String,List<TRowRange>> files)
+      throws ThriftSecurityException, TException {
+    // do not expect users to call this directly, expect other tservers to call this method
+    if (!security.canPerformSystemActions(credentials)) {
+      throw new AccumuloSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+    }
+
+    ExecutorService srp = server.resourceManager.getSummaryRetrievalExecutor();
+    TableConfiguration tableCfg =
+        server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
+    BlockCache summaryCache = server.resourceManager.getSummaryCache();
+    BlockCache indexCache = server.resourceManager.getIndexCache();
+    Cache<String,Long> fileLenCache = server.resourceManager.getFileLenCache();
+    FileSystemResolver volMgr = p -> fs.getFileSystemByPath(p);
+    Future<SummaryCollection> future =
+        new Gatherer(server.getContext(), request, tableCfg, server.getContext().getCryptoService())
+            .processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
+
+    return startSummaryOperation(credentials, future);
+  }
+
+  @Override
+  public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId)
+      throws NoSuchScanIDException, TException {
+    SummarySession session = (SummarySession) server.sessionManager.getSession(sessionId);
+    if (session == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    Future<SummaryCollection> future = session.getFuture();
+    try {
+      TSummaries tsums = getSummaries(future);
+      server.sessionManager.removeSession(sessionId);
+      return tsums;
+    } catch (TimeoutException e) {
+      return handleTimeout(sessionId);
+    }
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
new file mode 100644
index 0000000..3f9bbec
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class UnloadTabletHandler implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(UnloadTabletHandler.class);
+  private final KeyExtent extent;
+  private final TUnloadTabletGoal goalState;
+  private final long requestTimeSkew;
+  private final TabletServer server;
+
+  public UnloadTabletHandler(TabletServer server, KeyExtent extent, TUnloadTabletGoal goalState,
+      long requestTime) {
+    this.extent = extent;
+    this.goalState = goalState;
+    this.server = server;
+    this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
+  }
+
+  @Override
+  public void run() {
+
+    Tablet t = null;
+
+    synchronized (server.unopenedTablets) {
+      if (server.unopenedTablets.contains(extent)) {
+        server.unopenedTablets.remove(extent);
+        // enqueueMasterMessage(new TabletUnloadedMessage(extent));
+        return;
+      }
+    }
+    synchronized (server.openingTablets) {
+      while (server.openingTablets.contains(extent)) {
+        try {
+          server.openingTablets.wait();
+        } catch (InterruptedException e) {}
+      }
+    }
+    synchronized (server.onlineTablets) {
+      if (server.onlineTablets.snapshot().containsKey(extent)) {
+        t = server.onlineTablets.snapshot().get(extent);
+      }
+    }
+
+    if (t == null) {
+      // Tablet has probably been recently unloaded: repeated master
+      // unload request is crossing the successful unloaded message
+      if (!server.recentlyUnloadedCache.containsKey(extent)) {
+        log.info("told to unload tablet that was not being served {}", extent);
+        server.enqueueMasterMessage(
+            new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
+      }
+      return;
+    }
+
+    try {
+      t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
+    } catch (Throwable e) {
+
+      if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
+        log.debug("Failed to unload tablet {}... it was already closing or closed : {}", extent,
+            e.getMessage());
+      } else {
+        log.error("Failed to close tablet {}... Aborting migration", extent, e);
+        server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
+      }
+      return;
+    }
+
+    // stop serving tablet - client will get not serving tablet
+    // exceptions
+    server.recentlyUnloadedCache.put(extent, System.currentTimeMillis());
+    server.onlineTablets.remove(extent);
+
+    try {
+      TServerInstance instance =
+          new TServerInstance(server.clientAddress, server.getLock().getSessionId());
+      TabletLocationState tls = null;
+      try {
+        tls = new TabletLocationState(extent, null, instance, null, null, null, false);
+      } catch (BadLocationStateException e) {
+        log.error("Unexpected error", e);
+      }
+      if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet()
+          || (extent.isMeta()
+              && !server.getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
+        TabletStateStore.unassign(server.getContext(), tls, null);
+      } else {
+        TabletStateStore.suspend(server.getContext(), tls, null,
+            requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
+      }
+    } catch (DistributedStoreException ex) {
+      log.warn("Unable to update storage", ex);
+    } catch (KeeperException e) {
+      log.warn("Unable determine our zookeeper session information", e);
+    } catch (InterruptedException e) {
+      log.warn("Interrupted while getting our zookeeper session information", e);
+    }
+
+    // tell the master how it went
+    server.enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
+
+    // roll tablet stats over into tablet server's statsKeeper object as
+    // historical data
+    server.statsKeeper.saveMajorMinorTimes(t.getTabletStats());
+  }
+}