You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 16:15:37 UTC

[2/7] ACCUMULO-2160 back-port real bugs found by findbugs

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index aa52834,0000000..c5695bc
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@@ -1,3602 -1,0 +1,3602 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.tabletserver;
 +
 +import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 +
 +import java.io.EOFException;
 +import java.io.File;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.lang.reflect.Field;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.UnknownHostException;
 +import java.nio.ByteBuffer;
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.SortedSet;
 +import java.util.TimerTask;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.BlockingDeque;
 +import java.util.concurrent.CancellationException;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.LinkedBlockingDeque;
 +import java.util.concurrent.RunnableFuture;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import javax.management.ObjectName;
 +import javax.management.StandardMBean;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.ScannerImpl;
 +import org.apache.accumulo.core.client.impl.TabletType;
 +import org.apache.accumulo.core.client.impl.Translator;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.Constraint.Environment;
 +import org.apache.accumulo.core.constraints.Violations;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.ConstraintViolationSummary;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.InitialMultiScan;
 +import org.apache.accumulo.core.data.thrift.InitialScan;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.data.thrift.MapFileInfo;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.data.thrift.ScanResult;
 +import org.apache.accumulo.core.data.thrift.TColumn;
 +import org.apache.accumulo.core.data.thrift.TKey;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.data.thrift.TKeyValue;
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.data.thrift.TRange;
 +import org.apache.accumulo.core.data.thrift.UpdateErrors;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.iterators.IterationInterruptedException;
 +import org.apache.accumulo.core.master.thrift.Compacting;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletLoadState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.ColumnFQ;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.Stat;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.ClientServiceHandler;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.data.ServerMutation;
 +import org.apache.accumulo.server.logger.LogFileKey;
 +import org.apache.accumulo.server.logger.LogFileValue;
 +import org.apache.accumulo.server.master.state.Assignment;
 +import org.apache.accumulo.server.master.state.DistributedStoreException;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.accumulo.server.master.state.TabletStateStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityConstants;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
 +import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 +import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
 +import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
 +import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 +import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
 +import org.apache.accumulo.server.tabletserver.Tablet.Scanner;
 +import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo;
 +import org.apache.accumulo.server.tabletserver.Tablet.TConstraintViolationException;
 +import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
 +import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 +import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
 +import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 +import org.apache.accumulo.server.tabletserver.log.LogSorter;
 +import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
 +import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
 +import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
 +import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
 +import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
 +import org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean;
 +import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
 +import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
 +import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.accumulo.server.util.FileSystemMonitor;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MapCounter;
 +import org.apache.accumulo.server.util.MetadataTable;
 +import org.apache.accumulo.server.util.MetadataTable.LogEntry;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 +import org.apache.accumulo.server.util.time.RelativeTime;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.Platform;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.start.classloader.vfs.ContextManager;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.commons.collections.map.LRUMap;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.Trash;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.SequenceFile.Reader;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.server.TServer;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +
 +enum ScanRunState {
 +  QUEUED, RUNNING, FINISHED
 +}
 +
 +public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
 +  private static final Logger log = Logger.getLogger(TabletServer.class);
 +  
 +  private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
 +  private static long lastMemorySize = 0;
 +  private static long gcTimeIncreasedCount;
 +  
 +  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
 +  
 +  private TabletServerLogger logger;
 +  
 +  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
 +  
 +  private ServerConfiguration serverConfig;
 +  private LogSorter logSorter = null;
 +  
 +  public TabletServer(ServerConfiguration conf, FileSystem fs) {
 +    super();
 +    this.serverConfig = conf;
 +    this.instance = conf.getInstance();
 +    this.fs = TraceFileSystem.wrap(fs);
 +    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error(ex, ex);
 +            }
 +        }
 +      }
 +    }, 5000, 5000);
 +  }
 +  
 +  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 +    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 +    Runtime rt = Runtime.getRuntime();
 +    
 +    StringBuilder sb = new StringBuilder("gc");
 +    
 +    boolean sawChange = false;
 +    
 +    long maxIncreaseInCollectionTime = 0;
 +    
 +    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 +      Long prevTime = prevGcTime.get(gcBean.getName());
 +      long pt = 0;
 +      if (prevTime != null) {
 +        pt = prevTime;
 +      }
 +      
 +      long time = gcBean.getCollectionTime();
 +      
 +      if (time - pt != 0) {
 +        sawChange = true;
 +      }
 +      
 +      long increaseInCollectionTime = time - pt;
 +      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
 +      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
 +      prevGcTime.put(gcBean.getName(), time);
 +    }
 +    
 +    long mem = rt.freeMemory();
 +    if (maxIncreaseInCollectionTime == 0) {
 +      gcTimeIncreasedCount = 0;
 +    } else {
 +      gcTimeIncreasedCount++;
 +      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 +        log.warn("Running low on memory");
 +        gcTimeIncreasedCount = 0;
 +      }
 +    }
 +    
 +    if (mem > lastMemorySize) {
 +      sawChange = true;
 +    }
 +    
 +    String sign = "+";
 +    if (mem - lastMemorySize <= 0) {
 +      sign = "";
 +    }
 +    
 +    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
 +    
 +    if (sawChange) {
 +      log.debug(sb.toString());
 +    }
 +    
 +    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 +    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 +      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
 +    }
 +    
 +    lastMemorySize = mem;
 +  }
 +  
 +  private TabletStatsKeeper statsKeeper;
 +  
 +  private static class Session {
 +    long lastAccessTime;
 +    long startTime;
 +    String user;
 +    String client = TServerUtils.clientAddress.get();
 +    public boolean reserved;
 +    
 +    public void cleanup() {}
 +  }
 +  
 +  private static class SessionManager {
 +    
 +    SecureRandom random;
 +    Map<Long,Session> sessions;
 +    
 +    SessionManager(AccumuloConfiguration conf) {
 +      random = new SecureRandom();
 +      sessions = new HashMap<Long,Session>();
 +      
 +      final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 +      
 +      Runnable r = new Runnable() {
 +        @Override
 +        public void run() {
 +          sweep(maxIdle);
 +        }
 +      };
 +      
 +      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 +    }
 +    
 +    synchronized long createSession(Session session, boolean reserve) {
 +      long sid = random.nextLong();
 +      
 +      while (sessions.containsKey(sid)) {
 +        sid = random.nextLong();
 +      }
 +      
 +      sessions.put(sid, session);
 +      
 +      session.reserved = reserve;
 +      
 +      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 +      
 +      return sid;
 +    }
 +    
 +    /**
 +     * while a session is reserved, it cannot be canceled or removed
 +     * 
 +     * @param sessionId
 +     */
 +    
 +    synchronized Session reserveSession(long sessionId) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null) {
 +        if (session.reserved)
 +          throw new IllegalStateException();
 +        session.reserved = true;
 +      }
 +      
 +      return session;
 +      
 +    }
 +    
 +    synchronized void unreserveSession(Session session) {
 +      if (!session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = false;
 +      session.lastAccessTime = System.currentTimeMillis();
 +    }
 +    
 +    synchronized void unreserveSession(long sessionId) {
 +      Session session = getSession(sessionId);
 +      if (session != null)
 +        unreserveSession(session);
 +    }
 +    
 +    synchronized Session getSession(long sessionId) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null)
 +        session.lastAccessTime = System.currentTimeMillis();
 +      return session;
 +    }
 +    
 +    Session removeSession(long sessionId) {
 +      Session session = null;
 +      synchronized (this) {
 +        session = sessions.remove(sessionId);
 +      }
 +      
 +      // do clean up out side of lock..
 +      if (session != null)
 +        session.cleanup();
 +      
 +      return session;
 +    }
 +    
 +    private void sweep(long maxIdle) {
 +      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 +      synchronized (this) {
 +        Iterator<Session> iter = sessions.values().iterator();
 +        while (iter.hasNext()) {
 +          Session session = iter.next();
 +          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 +          if (idleTime > maxIdle && !session.reserved) {
 +            iter.remove();
 +            sessionsToCleanup.add(session);
 +          }
 +        }
 +      }
 +      
 +      // do clean up outside of lock
 +      for (Session session : sessionsToCleanup) {
 +        session.cleanup();
 +      }
 +    }
 +    
 +    synchronized void removeIfNotAccessed(final long sessionId, long delay) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null) {
 +        final long removeTime = session.lastAccessTime;
 +        TimerTask r = new TimerTask() {
 +          @Override
 +          public void run() {
 +            Session sessionToCleanup = null;
 +            synchronized (SessionManager.this) {
 +              Session session2 = sessions.get(sessionId);
 +              if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
 +                sessions.remove(sessionId);
 +                sessionToCleanup = session2;
 +              }
 +            }
 +            
 +            // call clean up outside of lock
 +            if (sessionToCleanup != null)
 +              sessionToCleanup.cleanup();
 +          }
 +        };
 +        
 +        SimpleTimer.getInstance().schedule(r, delay);
 +      }
 +    }
 +    
 +    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
 +      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
 +      for (Entry<Long,Session> entry : sessions.entrySet()) {
 +        
 +        Session session = entry.getValue();
 +        @SuppressWarnings("rawtypes")
 +        ScanTask nbt = null;
 +        String tableID = null;
 +        
 +        if (session instanceof ScanSession) {
 +          ScanSession ss = (ScanSession) session;
 +          nbt = ss.nextBatchTask;
 +          tableID = ss.extent.getTableId().toString();
 +        } else if (session instanceof MultiScanSession) {
 +          MultiScanSession mss = (MultiScanSession) session;
 +          nbt = mss.lookupTask;
 +          tableID = mss.threadPoolExtent.getTableId().toString();
 +        }
 +        
 +        if (nbt == null)
 +          continue;
 +        
 +        ScanRunState srs = nbt.getScanRunState();
 +        
 +        if (nbt == null || srs == ScanRunState.FINISHED)
 +          continue;
 +        
 +        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 +        if (stateCounts == null) {
 +          stateCounts = new MapCounter<ScanRunState>();
 +          counts.put(tableID, stateCounts);
 +        }
 +        
 +        stateCounts.increment(srs, 1);
 +      }
 +      
 +      return counts;
 +    }
 +    
 +    public synchronized List<ActiveScan> getActiveScans() {
 +      
 +      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 +      
 +      long ct = System.currentTimeMillis();
 +      
 +      for (Entry<Long,Session> entry : sessions.entrySet()) {
 +        Session session = entry.getValue();
 +        if (session instanceof ScanSession) {
 +          ScanSession ss = (ScanSession) session;
 +          
 +          ScanState state = ScanState.RUNNING;
 +          
 +          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 +          if (nbt == null) {
 +            state = ScanState.IDLE;
 +          } else {
 +            switch (nbt.getScanRunState()) {
 +              case QUEUED:
 +                state = ScanState.QUEUED;
 +                break;
 +              case FINISHED:
 +                state = ScanState.IDLE;
 +                break;
 +              case RUNNING:
 +              default:
 +                /* do nothing */
 +                break;
 +            }
 +          }
 +          
 +          activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
 +              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
 +          
 +        } else if (session instanceof MultiScanSession) {
 +          MultiScanSession mss = (MultiScanSession) session;
 +          
 +          ScanState state = ScanState.RUNNING;
 +          
 +          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 +          if (nbt == null) {
 +            state = ScanState.IDLE;
 +          } else {
 +            switch (nbt.getScanRunState()) {
 +              case QUEUED:
 +                state = ScanState.QUEUED;
 +                break;
 +              case FINISHED:
 +                state = ScanState.IDLE;
 +                break;
 +              case RUNNING:
 +              default:
 +                /* do nothing */
 +                break;
 +            }
 +          }
 +          
 +          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
 +              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
 +                  .getAuthorizationsBB()));
 +        }
 +      }
 +      
 +      return activeScans;
 +    }
 +  }
 +  
 +  static class TservConstraintEnv implements Environment {
 +    
 +    private TCredentials credentials;
 +    private SecurityOperation security;
 +    private Authorizations auths;
 +    private KeyExtent ke;
 +    
 +    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 +      this.security = secOp;
 +      this.credentials = credentials;
 +    }
 +    
 +    void setExtent(KeyExtent ke) {
 +      this.ke = ke;
 +    }
 +    
 +    @Override
 +    public KeyExtent getExtent() {
 +      return ke;
 +    }
 +    
 +    @Override
 +    public String getUser() {
 +      return credentials.getPrincipal();
 +    }
 +    
 +    @Override
 +    public Authorizations getAuthorizations() {
 +      if (auths == null)
 +        try {
 +          this.auths = security.getUserAuthorizations(credentials);
 +        } catch (ThriftSecurityException e) {
 +          throw new RuntimeException(e);
 +        }
 +      return auths;
 +    }
 +    
 +  }
 +  
 +  private abstract class ScanTask<T> implements RunnableFuture<T> {
 +    
 +    protected AtomicBoolean interruptFlag;
 +    protected ArrayBlockingQueue<Object> resultQueue;
 +    protected AtomicInteger state;
 +    protected AtomicReference<ScanRunState> runState;
 +    
 +    private static final int INITIAL = 1;
 +    private static final int ADDED = 2;
 +    private static final int CANCELED = 3;
 +    
 +    ScanTask() {
 +      interruptFlag = new AtomicBoolean(false);
 +      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 +      state = new AtomicInteger(INITIAL);
 +      resultQueue = new ArrayBlockingQueue<Object>(1);
 +    }
 +    
 +    protected void addResult(Object o) {
 +      if (state.compareAndSet(INITIAL, ADDED))
 +        resultQueue.add(o);
 +      else if (state.get() == ADDED)
 +        throw new IllegalStateException("Tried to add more than one result");
 +    }
 +    
 +    @Override
 +    public boolean cancel(boolean mayInterruptIfRunning) {
 +      if (!mayInterruptIfRunning)
 +        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
 +      
 +      if (state.get() == CANCELED)
 +        return true;
 +      
 +      if (state.compareAndSet(INITIAL, CANCELED)) {
 +        interruptFlag.set(true);
 +        resultQueue = null;
 +        return true;
 +      }
 +      
 +      return false;
 +    }
 +    
 +    @Override
 +    public T get() throws InterruptedException, ExecutionException {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @SuppressWarnings("unchecked")
 +    @Override
 +    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 +      
 +      ArrayBlockingQueue<Object> localRQ = resultQueue;
 +      
 +      if (state.get() == CANCELED)
 +        throw new CancellationException();
 +      
 +      if (localRQ == null && state.get() == ADDED)
 +        throw new IllegalStateException("Tried to get result twice");
 +      
 +      Object r = localRQ.poll(timeout, unit);
 +      
 +      // could have been canceled while waiting
 +      if (state.get() == CANCELED) {
 +        if (r != null)
 +          throw new IllegalStateException("Nothing should have been added when in canceled state");
 +        
 +        throw new CancellationException();
 +      }
 +      
 +      if (r == null)
 +        throw new TimeoutException();
 +      
 +      // make this method stop working now that something is being
 +      // returned
 +      resultQueue = null;
 +      
 +      if (r instanceof Throwable)
 +        throw new ExecutionException((Throwable) r);
 +      
 +      return (T) r;
 +    }
 +    
 +    @Override
 +    public boolean isCancelled() {
 +      return state.get() == CANCELED;
 +    }
 +    
 +    @Override
 +    public boolean isDone() {
 +      return runState.get().equals(ScanRunState.FINISHED);
 +    }
 +    
 +    public ScanRunState getScanRunState() {
 +      return runState.get();
 +    }
 +    
 +  }
 +  
 +  private static class UpdateSession extends Session {
 +    public Tablet currentTablet;
 +    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
 +    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
 +    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
 +    public Violations violations;
 +    public TCredentials credentials;
 +    public long totalUpdates = 0;
 +    public long flushTime = 0;
 +    Stat prepareTimes = new Stat();
 +    Stat walogTimes = new Stat();
 +    Stat commitTimes = new Stat();
 +    Stat authTimes = new Stat();
 +    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
 +    public long queuedMutationSize = 0;
 +    TservConstraintEnv cenv = null;
 +  }
 +  
 +  private static class ScanSession extends Session {
 +    public KeyExtent extent;
 +    public HashSet<Column> columnSet;
 +    public List<IterInfo> ssiList;
 +    public Map<String,Map<String,String>> ssio;
 +    public Authorizations auths;
 +    public long entriesReturned = 0;
 +    public Stat nbTimes = new Stat();
 +    public long batchCount = 0;
 +    public volatile ScanTask<ScanBatch> nextBatchTask;
 +    public AtomicBoolean interruptFlag;
 +    public Scanner scanner;
 +    
 +    @Override
 +    public void cleanup() {
 +      try {
 +        if (nextBatchTask != null)
 +          nextBatchTask.cancel(true);
 +      } finally {
 +        if (scanner != null)
 +          scanner.close();
 +      }
 +    }
 +    
 +  }
 +  
 +  private static class MultiScanSession extends Session {
 +    HashSet<Column> columnSet;
 +    Map<KeyExtent,List<Range>> queries;
 +    public List<IterInfo> ssiList;
 +    public Map<String,Map<String,String>> ssio;
 +    public Authorizations auths;
 +    
 +    // stats
 +    int numRanges;
 +    int numTablets;
 +    int numEntries;
 +    long totalLookupTime;
 +    
 +    public volatile ScanTask<MultiScanResult> lookupTask;
 +    public KeyExtent threadPoolExtent;
 +    
 +    @Override
 +    public void cleanup() {
 +      if (lookupTask != null)
 +        lookupTask.cancel(true);
 +    }
 +  }
 +  
 +  /**
 +   * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
 +   * are monotonically increasing.
 +   * 
 +   */
 +  static class WriteTracker {
 +    private static AtomicLong operationCounter = new AtomicLong(1);
 +    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
 +    
 +    WriteTracker() {
 +      for (TabletType ttype : TabletType.values()) {
 +        inProgressWrites.put(ttype, new TreeSet<Long>());
 +      }
 +    }
 +    
 +    synchronized long startWrite(TabletType ttype) {
 +      long operationId = operationCounter.getAndIncrement();
 +      inProgressWrites.get(ttype).add(operationId);
 +      return operationId;
 +    }
 +    
 +    synchronized void finishWrite(long operationId) {
 +      if (operationId == -1)
 +        return;
 +      
 +      boolean removed = false;
 +      
 +      for (TabletType ttype : TabletType.values()) {
 +        removed = inProgressWrites.get(ttype).remove(operationId);
 +        if (removed)
 +          break;
 +      }
 +      
 +      if (!removed) {
 +        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId " + operationId);
 +      }
 +      
 +      this.notifyAll();
 +    }
 +    
 +    synchronized void waitForWrites(TabletType ttype) {
 +      long operationId = operationCounter.getAndIncrement();
 +      while (inProgressWrites.get(ttype).floor(operationId) != null) {
 +        try {
 +          this.wait();
 +        } catch (InterruptedException e) {
 +          log.error(e, e);
 +        }
 +      }
 +    }
 +    
 +    public long startWrite(Set<Tablet> keySet) {
 +      if (keySet.size() == 0)
 +        return -1;
 +      
 +      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
 +      
 +      for (Tablet tablet : keySet)
 +        extents.add(tablet.getExtent());
 +      
 +      return startWrite(TabletType.type(extents));
 +    }
 +  }
 +  
 +  public AccumuloConfiguration getSystemConfiguration() {
 +    return serverConfig.getConfiguration();
 +  }
 +  
 +  TransactionWatcher watcher = new TransactionWatcher();
 +  
 +  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
 +    
 +    SessionManager sessionManager;
 +    
 +    AccumuloConfiguration acuConf = getSystemConfiguration();
 +    
 +    TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
 +    
 +    TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
 +    
 +    WriteTracker writeTracker = new WriteTracker();
 +    
 +    ThriftClientHandler() {
 +      super(instance, watcher);
 +      log.debug(ThriftClientHandler.class.getName() + " created");
 +      sessionManager = new SessionManager(getSystemConfiguration());
 +      // Register the metrics MBean
 +      try {
 +        updateMetrics.register();
 +        scanMetrics.register();
 +      } catch (Exception e) {
 +        log.error("Exception registering MBean with MBean Server", e);
 +      }
 +    }
 +    
 +    @Override
 +    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
 +        throws ThriftSecurityException {
 +
 +      if (!security.canPerformSystemActions(credentials))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      
 +      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
 +      
 +      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
 +        TKeyExtent tke = entry.getKey();
 +        Map<String,MapFileInfo> fileMap = entry.getValue();
 +        
 +        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
 +        
 +        if (importTablet == null) {
 +          failures.add(tke);
 +        } else {
 +          try {
 +            importTablet.importMapFiles(tid, fileMap, setTime);
 +          } catch (IOException ioe) {
 +            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
 +            failures.add(tke);
 +          }
 +        }
 +      }
 +      return failures;
 +    }
 +    
 +    private class NextBatchTask extends ScanTask<ScanBatch> {
 +      
 +      private long scanID;
 +      
 +      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
 +        this.scanID = scanID;
 +        this.interruptFlag = interruptFlag;
 +        
 +        if (interruptFlag.get())
 +          cancel(true);
 +      }
 +      
 +      @Override
 +      public void run() {
 +        
 +        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
 +        String oldThreadName = Thread.currentThread().getName();
 +        
 +        try {
 +          if (isCancelled() || scanSession == null)
 +            return;
 +          
 +          runState.set(ScanRunState.RUNNING);
 +          
 +          Thread.currentThread().setName(
 +              "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
 +          
 +          Tablet tablet = onlineTablets.get(scanSession.extent);
 +          
 +          if (tablet == null) {
 +            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 +            return;
 +          }
 +          
 +          long t1 = System.currentTimeMillis();
 +          ScanBatch batch = scanSession.scanner.read();
 +          long t2 = System.currentTimeMillis();
 +          scanSession.nbTimes.addStat(t2 - t1);
 +          
 +          // there should only be one thing on the queue at a time, so
 +          // it should be ok to call add()
 +          // instead of put()... if add() fails because queue is at
 +          // capacity it means there is code
 +          // problem somewhere
 +          addResult(batch);
 +        } catch (TabletClosedException e) {
 +          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 +        } catch (IterationInterruptedException iie) {
 +          if (!isCancelled()) {
 +            log.warn("Iteration interrupted, when scan not cancelled", iie);
 +            addResult(iie);
 +          }
 +        } catch (TooManyFilesException tmfe) {
 +          addResult(tmfe);
 +        } catch (Throwable e) {
 +          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
 +          addResult(e);
 +        } finally {
 +          runState.set(ScanRunState.FINISHED);
 +          Thread.currentThread().setName(oldThreadName);
 +        }
 +        
 +      }
 +    }
 +    
 +    private class LookupTask extends ScanTask<MultiScanResult> {
 +      
 +      private long scanID;
 +      
 +      LookupTask(long scanID) {
 +        this.scanID = scanID;
 +      }
 +      
 +      @Override
 +      public void run() {
 +        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
 +        String oldThreadName = Thread.currentThread().getName();
 +        
 +        try {
 +          if (isCancelled() || session == null)
 +            return;
 +          
 +          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
 +          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
 +          
 +          runState.set(ScanRunState.RUNNING);
 +          Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
 +          
 +          long bytesAdded = 0;
 +          long maxScanTime = 4000;
 +          
 +          long startTime = System.currentTimeMillis();
 +          
 +          ArrayList<KVEntry> results = new ArrayList<KVEntry>();
 +          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
 +          ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
 +          KeyExtent partScan = null;
 +          Key partNextKey = null;
 +          boolean partNextKeyInclusive = false;
 +          
 +          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
 +          
 +          // check the time so that the read ahead thread is not monopolized
 +          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
 +            Entry<KeyExtent,List<Range>> entry = iter.next();
 +            
 +            iter.remove();
 +            
 +            // check that tablet server is serving requested tablet
 +            Tablet tablet = onlineTablets.get(entry.getKey());
 +            if (tablet == null) {
 +              failures.put(entry.getKey(), entry.getValue());
 +              continue;
 +            }
 +            Thread.currentThread().setName(
 +                "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
 +            
 +            LookupResult lookupResult;
 +            try {
 +              
 +              // do the following check to avoid a race condition
 +              // between setting false below and the task being
 +              // canceled
 +              if (isCancelled())
 +                interruptFlag.set(true);
 +              
 +              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
 +                  session.ssio, interruptFlag);
 +              
 +              // if the tablet was closed it it possible that the
 +              // interrupt flag was set.... do not want it set for
 +              // the next
 +              // lookup
 +              interruptFlag.set(false);
 +              
 +            } catch (IOException e) {
 +              log.warn("lookup failed for tablet " + entry.getKey(), e);
 +              throw new RuntimeException(e);
 +            }
 +            
 +            bytesAdded += lookupResult.bytesAdded;
 +            
 +            if (lookupResult.unfinishedRanges.size() > 0) {
 +              if (lookupResult.closed) {
 +                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
 +              } else {
 +                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
 +                partScan = entry.getKey();
 +                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
 +                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
 +              }
 +            } else {
 +              fullScans.add(entry.getKey());
 +            }
 +          }
 +          
 +          long finishTime = System.currentTimeMillis();
 +          session.totalLookupTime += (finishTime - startTime);
 +          session.numEntries += results.size();
 +          
 +          // convert everything to thrift before adding result
 +          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
 +          for (KVEntry entry : results)
 +            retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
 +          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
 +          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
 +          TKeyExtent retPartScan = null;
 +          TKey retPartNextKey = null;
 +          if (partScan != null) {
 +            retPartScan = partScan.toThrift();
 +            retPartNextKey = partNextKey.toThrift();
 +          }
 +          // add results to queue
 +          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
 +        } catch (IterationInterruptedException iie) {
 +          if (!isCancelled()) {
 +            log.warn("Iteration interrupted, when scan not cancelled", iie);
 +            addResult(iie);
 +          }
 +        } catch (Throwable e) {
 +          log.warn("exception while doing multi-scan ", e);
 +          addResult(e);
 +        } finally {
 +          Thread.currentThread().setName(oldThreadName);
 +          runState.set(ScanRunState.FINISHED);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
 +        throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +      
 +      Authorizations userauths = null;
 +      if (!security.canScan(credentials, new String(textent.getTable())))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      
 +      userauths = security.getUserAuthorizations(credentials);
 +      for (ByteBuffer auth : authorizations)
 +        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 +      
 +      KeyExtent extent = new KeyExtent(textent);
 +      
 +      // wait for any writes that are in flight.. this done to ensure
 +      // consistency across client restarts... assume a client writes
 +      // to accumulo and dies while waiting for a confirmation from
 +      // accumulo... the client process restarts and tries to read
 +      // data from accumulo making the assumption that it will get
 +      // any writes previously made... however if the server side thread
 +      // processing the write from the dead client is still in progress,
 +      // the restarted client may not see the write unless we wait here.
 +      // this behavior is very important when the client is reading the
 +      // !METADATA table
 +      if (waitForWrites)
 +        writeTracker.waitForWrites(TabletType.type(extent));
 +      
 +      Tablet tablet = onlineTablets.get(extent);
 +      if (tablet == null)
 +        throw new NotServingTabletException(textent);
 +      
 +      ScanSession scanSession = new ScanSession();
 +      scanSession.user = credentials.getPrincipal();
 +      scanSession.extent = new KeyExtent(extent);
 +      scanSession.columnSet = new HashSet<Column>();
 +      scanSession.ssiList = ssiList;
 +      scanSession.ssio = ssio;
 +      scanSession.auths = new Authorizations(authorizations);
 +      scanSession.interruptFlag = new AtomicBoolean();
 +      
 +      for (TColumn tcolumn : columns) {
 +        scanSession.columnSet.add(new Column(tcolumn));
 +      }
 +      
 +      scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
 +          scanSession.interruptFlag);
 +      
 +      long sid = sessionManager.createSession(scanSession, true);
 +      
 +      ScanResult scanResult;
 +      try {
 +        scanResult = continueScan(tinfo, sid, scanSession);
 +      } catch (NoSuchScanIDException e) {
 +        log.error("The impossible happened", e);
 +        throw new RuntimeException();
 +      } finally {
 +        sessionManager.unreserveSession(sid);
 +      }
 +      
 +      return new InitialScan(sid, scanResult);
 +    }
 +    
 +    @Override
 +    public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
 +        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
 +      if (scanSession == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +      
 +      try {
 +        return continueScan(tinfo, scanID, scanSession);
 +      } finally {
 +        sessionManager.unreserveSession(scanSession);
 +      }
 +    }
 +    
 +    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
 +        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +      
 +      if (scanSession.nextBatchTask == null) {
 +        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
 +        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
 +      }
 +      
 +      ScanBatch bresult;
 +      try {
 +        bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
 +        scanSession.nextBatchTask = null;
 +      } catch (ExecutionException e) {
 +        sessionManager.removeSession(scanID);
 +        if (e.getCause() instanceof NotServingTabletException)
 +          throw (NotServingTabletException) e.getCause();
 +        else if (e.getCause() instanceof TooManyFilesException)
 +          throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
 +        else
 +          throw new RuntimeException(e);
 +      } catch (CancellationException ce) {
 +        sessionManager.removeSession(scanID);
 +        Tablet tablet = onlineTablets.get(scanSession.extent);
 +        if (tablet == null || tablet.isClosed())
 +          throw new NotServingTabletException(scanSession.extent.toThrift());
 +        else
 +          throw new NoSuchScanIDException();
 +      } catch (TimeoutException e) {
 +        List<TKeyValue> param = Collections.emptyList();
 +        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
 +        sessionManager.removeIfNotAccessed(scanID, timeout);
 +        return new ScanResult(param, true);
 +      } catch (Throwable t) {
 +        sessionManager.removeSession(scanID);
 +        log.warn("Failed to get next batch", t);
 +        throw new RuntimeException(t);
 +      }
 +      
 +      ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
 +      
 +      scanSession.entriesReturned += scanResult.results.size();
 +      
 +      scanSession.batchCount++;
 +      
 +      if (scanResult.more && scanSession.batchCount > 3) {
 +        // start reading next batch while current batch is transmitted
 +        // to client
 +        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
 +        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
 +      }
 +      
 +      if (!scanResult.more)
 +        closeScan(tinfo, scanID);
 +      
 +      return scanResult;
 +    }
 +    
 +    @Override
 +    public void closeScan(TInfo tinfo, long scanID) {
 +      ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
 +      if (ss != null) {
 +        long t2 = System.currentTimeMillis();
 +        
 +        log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
 +            .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
 +        if (scanMetrics.isEnabled()) {
 +          scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
 +          scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
 +      // find all of the tables that need to be scanned
 +      HashSet<String> tables = new HashSet<String>();
 +      for (TKeyExtent keyExtent : tbatch.keySet()) {
 +        tables.add(new String(keyExtent.getTable()));
 +      }
 +      
 +      // check if user has permission to the tables
 +      Authorizations userauths = null;
 +      for (String table : tables)
 +        if (!security.canScan(credentials, table))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      
 +      userauths = security.getUserAuthorizations(credentials);
 +      for (ByteBuffer auth : authorizations)
 +        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 +      
 +      KeyExtent threadPoolExtent = null;
 +      
 +      Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
 +      
 +      for (KeyExtent keyExtent : batch.keySet()) {
 +        if (threadPoolExtent == null) {
 +          threadPoolExtent = keyExtent;
 +        } else if (keyExtent.isRootTablet()) {
 +          throw new IllegalArgumentException("Cannot batch query root tablet with other tablets " + threadPoolExtent + " " + keyExtent);
 +        } else if (keyExtent.isMeta() && !threadPoolExtent.isMeta()) {
 +          throw new IllegalArgumentException("Cannot batch query !METADATA and non !METADATA tablets " + threadPoolExtent + " " + keyExtent);
 +        }
 +        
 +      }
 +      
 +      if (waitForWrites)
 +        writeTracker.waitForWrites(TabletType.type(batch.keySet()));
 +      
 +      MultiScanSession mss = new MultiScanSession();
 +      mss.user = credentials.getPrincipal();
 +      mss.queries = batch;
 +      mss.columnSet = new HashSet<Column>(tcolumns.size());
 +      mss.ssiList = ssiList;
 +      mss.ssio = ssio;
 +      mss.auths = new Authorizations(authorizations);
 +      
 +      mss.numTablets = batch.size();
 +      for (List<Range> ranges : batch.values()) {
 +        mss.numRanges += ranges.size();
 +      }
 +      
 +      for (TColumn tcolumn : tcolumns)
 +        mss.columnSet.add(new Column(tcolumn));
 +      
 +      mss.threadPoolExtent = threadPoolExtent;
 +      
 +      long sid = sessionManager.createSession(mss, true);
 +      
 +      MultiScanResult result;
 +      try {
 +        result = continueMultiScan(tinfo, sid, mss);
 +      } catch (NoSuchScanIDException e) {
 +        log.error("the impossible happened", e);
 +        throw new RuntimeException("the impossible happened", e);
 +      } finally {
 +        sessionManager.unreserveSession(sid);
 +      }
 +      
 +      return new InitialMultiScan(sid, result);
 +    }
 +    
 +    @Override
 +    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
 +      
 +      MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
 +      
 +      if (session == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +      
 +      try {
 +        return continueMultiScan(tinfo, scanID, session);
 +      } finally {
 +        sessionManager.unreserveSession(session);
 +      }
 +    }
 +    
 +    private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
 +      
 +      if (session.lookupTask == null) {
 +        session.lookupTask = new LookupTask(scanID);
 +        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
 +      }
 +      
 +      try {
 +        MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
 +        session.lookupTask = null;
 +        return scanResult;
 +      } catch (TimeoutException e1) {
 +        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
 +        sessionManager.removeIfNotAccessed(scanID, timeout);
 +        List<TKeyValue> results = Collections.emptyList();
 +        Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
 +        List<TKeyExtent> fullScans = Collections.emptyList();
 +        return new MultiScanResult(results, failures, fullScans, null, null, false, true);
 +      } catch (Throwable t) {
 +        sessionManager.removeSession(scanID);
 +        log.warn("Failed to get multiscan result", t);
 +        throw new RuntimeException(t);
 +      }
 +    }
 +    
 +    @Override
 +    public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
 +      MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
 +      if (session == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
 +          session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
 +    }
 +    
 +    @Override
 +    public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
 +      // Make sure user is real
 +      
 +      security.authenticateUser(credentials, credentials);
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +      
 +      UpdateSession us = new UpdateSession();
 +      us.violations = new Violations();
 +      us.credentials = credentials;
 +      us.cenv = new TservConstraintEnv(security, us.credentials);
 +      
 +      long sid = sessionManager.createSession(us, false);
 +      
 +      return sid;
 +    }
 +    
 +    private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
 +      long t1 = System.currentTimeMillis();
 +      if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
 +        return;
 +      if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
 +        // if there were previous failures, then do not accept additional writes
 +        return;
 +      }
 +      
 +      try {
 +        // if user has no permission to write to this table, add it to
 +        // the failures list
 +        boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
 +        if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
 +          long t2 = System.currentTimeMillis();
 +          us.authTimes.addStat(t2 - t1);
 +          us.currentTablet = onlineTablets.get(keyExtent);
 +          if (us.currentTablet != null) {
 +            us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
 +          } else {
 +            // not serving tablet, so report all mutations as
 +            // failures
 +            us.failures.put(keyExtent, 0l);
 +            if (updateMetrics.isEnabled())
 +              updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
 +          }
 +        } else {
 +          log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
 +          long t2 = System.currentTimeMillis();
 +          us.authTimes.addStat(t2 - t1);
 +          us.currentTablet = null;
 +          us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
 +          if (updateMetrics.isEnabled())
 +            updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +          return;
 +        }
 +      } catch (ThriftSecurityException e) {
 +        log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
 +        long t2 = System.currentTimeMillis();
 +        us.authTimes.addStat(t2 - t1);
 +        us.currentTablet = null;
 +        us.authFailures.put(keyExtent, e.getCode());
 +        if (updateMetrics.isEnabled())
 +          updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +        return;
 +      }
 +    }
 +    
 +    @Override
 +    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
 +      UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
 +      if (us == null) {
 +        throw new RuntimeException("No Such SessionID");
 +      }
 +      
 +      try {
 +        KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +        setUpdateTablet(us, keyExtent);
 +        
 +        if (us.currentTablet != null) {
 +          List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
 +          for (TMutation tmutation : tmutations) {
 +            Mutation mutation = new ServerMutation(tmutation);
 +            mutations.add(mutation);
 +            us.queuedMutationSize += mutation.numBytes();
 +          }
 +          if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
 +            flush(us);
 +        }
 +      } finally {
 +        sessionManager.unreserveSession(us);
 +      }
 +    }
 +    
 +    private void flush(UpdateSession us) {
 +      
 +      int mutationCount = 0;
 +      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
 +      Throwable error = null;
 +      
 +      long pt1 = System.currentTimeMillis();
 +      
 +      boolean containsMetadataTablet = false;
 +      for (Tablet tablet : us.queuedMutations.keySet())
 +        if (tablet.getExtent().isMeta())
 +          containsMetadataTablet = true;
 +      
 +      if (!containsMetadataTablet && us.queuedMutations.size() > 0)
 +        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
 +      
 +      Span prep = Trace.start("prep");
 +      for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
 +        
 +        Tablet tablet = entry.getKey();
 +        List<Mutation> mutations = entry.getValue();
 +        if (mutations.size() > 0) {
 +          try {
 +            if (updateMetrics.isEnabled())
 +              updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
 +            
 +            CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
 +            if (commitSession == null) {
 +              if (us.currentTablet == tablet) {
 +                us.currentTablet = null;
 +              }
 +              us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
 +            } else {
 +              sendables.put(commitSession, mutations);
 +              mutationCount += mutations.size();
 +            }
 +            
 +          } catch (TConstraintViolationException e) {
 +            us.violations.add(e.getViolations());
 +            if (updateMetrics.isEnabled())
 +              updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
 +            
 +            if (e.getNonViolators().size() > 0) {
 +              // only log and commit mutations if there were some
 +              // that did not
 +              // violate constraints... this is what
 +              // prepareMutationsForCommit()
 +              // expects
 +              sendables.put(e.getCommitSession(), e.getNonViolators());
 +            }
 +            
 +            mutationCount += mutations.size();
 +            
 +          } catch (HoldTimeoutException t) {
 +            error = t;
 +            log.debug("Giving up on mutations due to a long memory hold time");
 +            break;
 +          } catch (Throwable t) {
 +            error = t;
 +            log.error("Unexpected error preparing for commit", error);
 +            break;
 +          }
 +        }
 +      }
 +      prep.stop();
 +      
 +      Span wal = Trace.start("wal");
 +      long pt2 = System.currentTimeMillis();
 +      long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
 +      us.prepareTimes.addStat(pt2 - pt1);
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
 +      
 +      if (error != null) {
 +        for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
 +          e.getKey().abortCommit(e.getValue());
 +        }
 +        throw new RuntimeException(error);
 +      }
 +      try {
 +        while (true) {
 +          try {
 +            long t1 = System.currentTimeMillis();
 +            
 +            logger.logManyTablets(sendables);
 +            
 +            long t2 = System.currentTimeMillis();
 +            us.walogTimes.addStat(t2 - t1);
 +            if (updateMetrics.isEnabled())
 +              updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
 +            
 +            break;
 +          } catch (IOException ex) {
 +            log.warn("logging mutations failed, retrying");
 +          } catch (Throwable t) {
 +            log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
 +            throw new RuntimeException(t);
 +          }
 +        }
 +        
 +        wal.stop();
 +        
 +        Span commit = Trace.start("commit");
 +        long t1 = System.currentTimeMillis();
 +        for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
 +          CommitSession commitSession = entry.getKey();
 +          List<Mutation> mutations = entry.getValue();
 +          
 +          commitSession.commit(mutations);
 +          
 +          Tablet tablet = commitSession.getTablet();
 +          
 +          if (tablet == us.currentTablet) {
 +            // because constraint violations may filter out some
 +            // mutations, for proper
 +            // accounting with the client code, need to increment
 +            // the count based
 +            // on the original number of mutations from the client
 +            // NOT the filtered number
 +            us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
 +          }
 +        }
 +        long t2 = System.currentTimeMillis();
 +        
 +        long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
 +        
 +        us.flushTime += (t2 - pt1);
 +        us.commitTimes.addStat(t2 - t1);
 +        
 +        if (updateMetrics.isEnabled())
 +          updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
 +        commit.stop();
 +      } finally {
 +        us.queuedMutations.clear();
 +        if (us.currentTablet != null) {
 +          us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
 +        }
 +        us.queuedMutationSize = 0;
 +      }
 +      us.totalUpdates += mutationCount;
 +    }
 +    
 +    @Override
 +    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
 +      UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
 +      if (us == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +      
 +      // clients may or may not see data from an update session while
 +      // it is in progress, however when the update session is closed
 +      // want to ensure that reads wait for the write to finish
 +      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
 +      
 +      try {
 +        flush(us);
 +      } finally {
 +        writeTracker.finishWrite(opid);
 +      }
 +      
 +      log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
 +          (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
 +          us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
 +      if (us.failures.size() > 0) {
 +        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
 +        log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
 +      }
 +      List<ConstraintViolationSummary> violations = us.violations.asList();
 +      if (violations.size() > 0) {
 +        ConstraintViolationSummary first = us.violations.asList().iterator().next();
 +        log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
 +      }
 +      if (us.authFailures.size() > 0) {
 +        KeyExtent first = us.authFailures.keySet().iterator().next();
 +        log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
 +      }
 +      
 +      return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
 +          us.authFailures, Translator.KET));
 +    }
 +    
 +    @Override
 +    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
 +        ConstraintViolationException, ThriftSecurityException {
 +
 +      if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +      Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
 +      if (tablet == null) {
 +        throw new NotServingTabletException(tkeyExtent);
 +      }
 +      
 +      if (!keyExtent.isMeta())
 +        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
 +      
 +      long opid = writeTracker.startWrite(TabletType.type(keyExtent));
 +      
 +      try {
 +        Mutation mutation = new ServerMutation(tmutation);
 +        List<Mutation> mutations = Collections.singletonList(mutation);
 +        
 +        Span prep = Trace.start("prep");
 +        CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
 +        prep.stop();
 +        if (cs == null) {
 +          throw new NotServingTabletException(tkeyExtent);
 +        }
 +        
 +        while (true) {
 +          try {
 +            Span wal = Trace.start("wal");
 +            logger.log(cs, cs.getWALogSeq(), mutation);
 +            wal.stop();
 +            break;
 +          } catch (IOException ex) {
 +            log.warn(ex, ex);
 +          }
 +        }
 +        
 +        Span commit = Trace.start("commit");
 +        cs.commit(mutations);
 +        commit.stop();
 +      } catch (TConstraintViolationException e) {
 +        throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
 +      } finally {
 +        writeTracker.finishWrite(opid);
 +      }
 +    }
 +    
 +    @Override
 +    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint)
 +        throws NotServingTabletException, ThriftSecurityException {
 +      
 +      String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
 +      if (!security.canSplitTablet(credentials, tableId))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      
 +      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +      
 +      Tablet tablet = onlineTablets.get(keyExtent);
 +      if (tablet == null) {
 +        throw new NotServingTabletException(tkeyExtent);
 +      }
 +      
 +      if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
 +        try {
 +          if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
 +            throw new NotServingTabletException(tkeyExtent);
 +          }
 +        } catch (IOException e) {
 +          log.warn("Failed to split " + keyExtent, e);
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return getStats(sessionManager.getActiveScansPerTable());
 +    }
 +    
 +    @Override
 +    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
 +      TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
 +      synchronized (onlineTablets) {
 +        onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
 +      }
 +      List<TabletStats> result = new ArrayList<TabletStats>();
 +      Text text = new Text(tableId);
 +      KeyExtent start = new KeyExtent(text, new Text(), null);
 +      for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
 +        KeyExtent ke = entry.getKey();
 +        if (ke.getTableId().compareTo(text) == 0) {
 +          Tablet tablet = entry.getValue();
 +          TabletStats stats = tablet.timer.getTabletStats();
 +          stats.extent = ke.toThrift();
 +          stats.ingestRate = tablet.ingestRate();
 +          stats.queryRate = tablet.queryRate();
 +          stats.splitCreationTime = tablet.getSplitCreationTime();
 +          stats.numEntries = tablet.getNumEntries();
 +          result.add(stats);
 +        }
 +      }
 +      return result;
 +    }
 +    
 +    private ZooCache masterLockCache = new ZooCache();
 +    
 +    private void checkPermission(TCredentials credentials, String lock, boolean requiresSystemPermission, final String request)
 +        throws ThriftSecurityException {
 +      if (requiresSystemPermission) {
 +        boolean fatal = false;
 +        try {
 +          log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
 +          if (!security.canPerformSystemActions(credentials)) {
 +            log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
 +            throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +          }
 +        } catch (ThriftSecurityException e) {
 +          log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
 +          if (e.getUser().equals(SecurityConstants.SYSTEM_PRINCIPAL)) {
 +            log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
 +            fatal = true;
 +          }
 +          throw e;
 +        } finally {
 +          if (fatal) {
 +            Halt.halt(1, new Runnable() {
 +              @Override
 +              public void run() {
 +                logGCInfo(getSystemConfiguration());
 +              }
 +            });
 +          }
 +        }
 +      }
 +      
 +      if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
 +        log.warn("Got " + request + " message from master before lock acquired, ignoring...");
 +        throw new RuntimeException("Lock not acquired");
 +      }
 +      
 +      if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
 +        Halt.halt(1, new Runnable() {
 +          @Override
 +          public void run() {
 +            log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
 +            logGCInfo(getSystemConfiguration());
 +          }
 +        });
 +      }
 +      
 +      if (lock != null) {
 +        ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
 +        
 +        try {
 +          if (!ZooLock.isLockHeld(masterLockCache, lid)) {
 +            // maybe the cache is out of date and a new master holds the
 +            // lock?
 +            masterLockCache.clear();
 +            if (!ZooLock.isLockHeld(masterLockCache, lid)) {
 +              log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
 +              throw new RuntimeException("bad master lock");
 +            }
 +          }
 +        } catch (Exception e) {
 +          throw new RuntimeException("bad master lock", e);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
 +      
 +      try {
 +        checkPermission(credentials, lock, true, "loadTablet");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      final KeyExtent extent = new KeyExtent(textent);
 +      
 +      synchronized (unopenedTablets) {
 +        synchronized (openingTablets) {
 +          synchronized (onlineTablets) {
 +            
 +            // checking if this exact tablet is in any of the sets
 +            // below is not a strong enough check
 +            // when splits and fix splits occurring
 +            
 +            Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
 +            Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
 +            Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
 +            Set<KeyExtent> all = new HashSet<KeyExtent>();
 +            all.addAll(unopenedOverlapping);
 +            all.addAll(openingOverlapping);
 +            all.addAll(onlineOverlapping);
 +            
 +            if (!all.isEmpty()) {
 +              if (all.size() != 1 || !all.contains(extent)) {
 +                log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
 +              }
 +              return;
 +            }
 +            
 +            unopenedTablets.add(extent);
 +          }
 +        }
 +      }
 +      
 +      // add the assignment job to the appropriate queue
 +      log.info("Loading tablet " + extent);
 +      
 +      final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
 +      // Root tablet assignment must take place immediately
 +      if (extent.isRootTablet()) {
 +        new Daemon("Root Tablet Assignment") {
 +          @Override
 +          public void run() {
 +            ah.run();
 +            if (onlineTablets.containsKey(extent)) {
 +              log.info("Root tablet loaded: " + extent);
 +            } else {
 +              log.info("Root tablet failed to load");
 +            }
 +            
 +          }
 +        }.start();
 +      } else {
 +        if (extent.isMeta()) {
 +          resourceManager.addMetaDataAssignment(ah);
 +        } else {
 +          resourceManager.addAssignment(ah);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) {
 +      try {
 +        checkPermission(credentials, lock, true, "unloadTablet");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      KeyExtent extent = new KeyExtent(textent);
 +      
 +      resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
 +    }
 +    
 +    @Override
 +    public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
 +      try {
 +        checkPermission(credentials, lock, true, "flush");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
 +      
 +      KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
 +      
 +      synchronized (onlineTablets) {
 +        for (Tablet tablet : onlineTablets.values())
 +          if (ke.overlaps(tablet.getExtent()))
 +            tabletsToFlush.add(tablet);
 +      }
 +      
 +      Long flushID = null;
 +      
 +      for (Tablet tablet : tabletsToFlush) {
 +        if (flushID == null) {
 +          // read the flush id once from zookeeper instead of reading
 +          // it for each tablet
 +          try {
 +            flushID = tablet.getFlushID();
 +          } catch (NoNodeException e) {
 +            // table was probably deleted
 +            log.info("Asked to flush table that has no flush id " + ke + " " + e.getMessage());
 +            return;
 +          }
 +        }
 +        tablet.flush(flushID);
 +      }
 +    }
 +    
 +    @Override
 +    public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
 +      try {
 +        checkPermission(credentials, lock, true, "flushTablet");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      Tablet tablet = onlineTablets.get(new KeyExtent(textent));
 +      if (tablet != null) {
 +        log.info("Flushing " + tablet.getExtent());
 +        try {
 +          tablet.flush(tablet.getFlushID());
 +        } catch (NoNodeException nne) {
 +          log.info("Asked to flush tablet that has no flush id " + new KeyExtent(textent) + " " + nne.getMessage());
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException {
 +      
 +        checkPermission(credentials, lock, true, "halt");
 +      
 +      Halt.halt(0, new Runnable() {
 +        @Override
 +        public void run() {
 +          log.info("Master requested tablet server halt");
 +          logGCInfo(getSystemConfiguration());
 +          serverStopRequested = true;
 +          try {
 +            tabletServerLock.unlock();
 +          } catch (Exception e) {
 +            log.error(e, e);
 +          }
 +        }
 +      });
 +    }
 +    
 +    @Override
 +    public void fastHalt(TInfo info, TCredentials credentials, String lock) {
 +      try {
 +        halt(info, credentials, lock);
 +      } catch (Exception e) {
 +        log.warn("Error halting", e);
 +      }
 +    }
 +    
 +    @Override
 +    public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return statsKeeper.getTabletStats();
 +    }
 +    
 +    @Override
 +    public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      try {
 +        checkPermission(credentials, null, true, "getScans");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      return sessionManager.getActiveScans();
 +    }
 +    
 +    @Override
 +    public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
 +      try {
 +        checkPermission(credentials, lock, true, "chop");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      KeyExtent ke = new KeyExtent(textent);
 +      
 +      Tablet tablet = onlineTablets.get(ke);
 +      if (tablet != null) {
 +        tablet.chopFiles();
 +      }
 +    }
 +    
 +    @Override
 +    public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow)
 +        throws TException {
 +      try {
 +        checkPermission(credentials, lock, true, "compact");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      }
 +      
 +      KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
 +      
 +      ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
 +      synchronized (onlineTablets) {
 +        for (Tablet tablet : onlineTablets.values())
 +          if (ke.overlaps(tablet.getExtent()))
 +            tabletsToCompact.add(tablet);
 +      }
 +      
 +      Long compactionId = null;
 +      
 +      for (Tablet tablet : tabletsToCompact) {
 +        // all for the same table id, so only need to read
 +        // compaction id once
 +        if (compactionId == null)
 +          try {
 +            compactionId = tablet.getCompactionID().getFirst();
 +          } catch (NoNodeException e) {
 +            log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage());
 +            return;
 +          }
 +        tablet.compactAll(compactionId);
 +      }
 +      
 +    }
 +    
 +    /*
 +     * (non-Javadoc)
 +     * 
 +     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
 +     * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
 +     */
 +    @Override
 +    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
 +      String myname = getClientAddressString();
 +      myname = myname.replace(':', '+');
 +      Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
 +      Set<String> loggers = new HashSet<String>();
 +      logger.getLoggers(loggers);
 +      nextFile: for (String filename : filenames) {
 +        for (String logger : loggers) {
 +          if (logger.contains(filename))
 +            continue nextFile;
 +        }
 +        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
 +        synchronized (onlineTablets) {
 +          onlineTabletsCopy.addAll(onlineTablets.values());
 +        }
 +        for (Tablet tablet : onlineTabletsCopy) {
 +          for (String current : tablet.getCurrentLogs()) {
 +            if (current.contains(filename)) {
 +              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
 +              continue nextFile;
 +            }
 +          }
 +        }
 +        try {
 +          String source = logDir + "/" + filename;
 +          if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
 +            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
 +            fs.mkdirs(new Path(walogArchive));
 +            String dest = walogArchive + "/" + filename;
 +            log.info("Archiving walog " + source + " to " + dest);
 +            if (!fs.rename(new Path(source), new Path(dest)))
 +              log.error("rename is unsuccessful");
 +          } else {
 +            log.info("Deleting walog " + filename);
 +            Trash trash = new Trash(fs, fs.getConf());
 +            Path sourcePath = new Path(source);
 +            if (!(!acuConf.getBoolean(Property.GC_TRASH_IGNORE) && trash.moveToTrash(sourcePath)) && !fs.delete(sourcePath, true))
 +              log.warn("Failed to delete walog " + source);
 +            Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
 +            try {
 +              if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
 +                log.info("Deleted any recovery log " + filename);
 +            } catch (FileNotFoundException ex) {
 +              // ignore
 +            }
 +            
 +          }
 +        } catch (IOException e) {
 +          log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      try {
 +        checkPermission(credentials, null, true, "getActiveCompactions");
 +      } catch (ThriftSecurityException e) {
 +        log.error(e, e);
 +        throw new RuntimeException(e);
 +      } 
 +      
 +      List<CompactionInfo> compactions = Compactor.getRunningCompactions();
 +      List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
 +      
 +      for (CompactionInfo compactionInfo : compactions) {
 +        ret.add(compactionInfo.toThrift());
 +      }
 +      
 +      return ret;
 +    }
 +  }
 +  
 +  private class SplitRunner implements Runnable {
 +    private Tablet tablet;
 +    
 +    public SplitRunner(Tablet tablet) {
 +      this.tablet = tablet;
 +    }
 +    
 +    @Override
 +    public void run() {
 +      if (majorCompactorDisabled) {
 +        // this will make split task that were queued when shutdown was
 +        // initiated exit
 +        return;
 +      }
 +      
 +      splitTablet(tablet);
 +    }
 +  }
 +  
 +  boolean isMajorCompactionDisabled() {
 +    return majorCompactorDisabled;
 +  }
 +  
 +  void executeSplit(Tablet tablet) {
 +    resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
 +  }
 +  
 +  private class MajorCompactor implements Runnable {
 +    
 +    @Override
 +    public void run() {
 +      while (!majorCompactorDisabled) {
 +        try {
 +          UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY));
 +          
 +          TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
 +          
 +          synchronized (onlineTablets) {
 +            copyOnlineTablets.putAll(onlineTablets); // avoid
 +            // concurrent
 +            // modification
 +          }
 +          
 +          int numMajorCompactionsInProgress = 0;
 +          
 +          Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
 +         

<TRUNCATED>