You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/01/29 00:17:11 UTC

[1/4] git commit: ACCUMULO-2144 fixed listcompactions and listscans so they no longer wrap the appropriate exceptions in RTEs

Updated Branches:
  refs/heads/master 21f09f42a -> b8f2b72c2


ACCUMULO-2144 fixed listcompactions and listscans so they no longer wrap the appropriate exceptions in RTEs


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/16e095fd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/16e095fd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/16e095fd

Branch: refs/heads/master
Commit: 16e095fde29bd0898d3cdeda525b1ac56497e47e
Parents: 07da9e3
Author: John Vines <vi...@apache.org>
Authored: Tue Jan 28 18:11:02 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Tue Jan 28 18:11:02 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/server/tabletserver/TabletServer.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/16e095fd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index ebbc214..9943926 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1995,7 +1995,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         checkPermission(credentials, null, true, "getScans");
       } catch (ThriftSecurityException e) {
         log.error(e, e);
-        throw new RuntimeException(e);
+        throw e;
       }
       
       return sessionManager.getActiveScans();
@@ -2120,7 +2120,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         checkPermission(credentials, null, true, "getActiveCompactions");
       } catch (ThriftSecurityException e) {
         log.error(e, e);
-        throw new RuntimeException(e);
+        throw e;
       } 
       
       List<CompactionInfo> compactions = Compactor.getRunningCompactions();


[4/4] git commit: Merge branch '1.6.0-SNAPSHOT'

Posted by vi...@apache.org.
Merge branch '1.6.0-SNAPSHOT'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b8f2b72c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b8f2b72c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b8f2b72c

Branch: refs/heads/master
Commit: b8f2b72c219d1af3db412704a845c6293d928ac7
Parents: 21f09f4 3f92e01
Author: John Vines <vi...@apache.org>
Authored: Tue Jan 28 18:17:06 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Tue Jan 28 18:17:06 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/tserver/TabletServer.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[2/4] Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by vi...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3f92e012/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 71d7328,0000000..7fdc9b2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -1,3898 -1,0 +1,3898 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.net.Socket;
 +import java.net.UnknownHostException;
 +import java.nio.ByteBuffer;
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.SortedSet;
 +import java.util.TimerTask;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.BlockingDeque;
 +import java.util.concurrent.CancellationException;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.LinkedBlockingDeque;
 +import java.util.concurrent.RunnableFuture;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import javax.management.ObjectName;
 +import javax.management.StandardMBean;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.CompressedIterators;
 +import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
 +import org.apache.accumulo.core.client.impl.ScannerImpl;
 +import org.apache.accumulo.core.client.impl.TabletType;
 +import org.apache.accumulo.core.client.impl.Translator;
 +import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
 +import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.Constraint.Environment;
 +import org.apache.accumulo.core.constraints.Violations;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.ConstraintViolationSummary;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.InitialMultiScan;
 +import org.apache.accumulo.core.data.thrift.InitialScan;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.data.thrift.MapFileInfo;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.data.thrift.ScanResult;
 +import org.apache.accumulo.core.data.thrift.TCMResult;
 +import org.apache.accumulo.core.data.thrift.TCMStatus;
 +import org.apache.accumulo.core.data.thrift.TColumn;
 +import org.apache.accumulo.core.data.thrift.TCondition;
 +import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 +import org.apache.accumulo.core.data.thrift.TConditionalSession;
 +import org.apache.accumulo.core.data.thrift.TKey;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.data.thrift.TKeyValue;
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.data.thrift.TRange;
 +import org.apache.accumulo.core.data.thrift.UpdateErrors;
 +import org.apache.accumulo.core.iterators.IterationInterruptedException;
 +import org.apache.accumulo.core.master.thrift.Compacting;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletLoadState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.AuthorizationContainer;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.ColumnFQ;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.Stat;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.ClientServiceHandler;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.data.ServerMutation;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.master.recovery.RecoveryPath;
 +import org.apache.accumulo.server.master.state.Assignment;
 +import org.apache.accumulo.server.master.state.DistributedStoreException;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.accumulo.server.master.state.TabletStateStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.util.FileSystemMonitor;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MasterMetadataUtil;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 +import org.apache.accumulo.server.util.time.RelativeTime;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 +import org.apache.accumulo.server.zookeeper.ZooCache;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.accumulo.start.classloader.vfs.ContextManager;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.accumulo.tserver.Compactor.CompactionInfo;
 +import org.apache.accumulo.tserver.RowLocks.RowLock;
 +import org.apache.accumulo.tserver.Tablet.CommitSession;
 +import org.apache.accumulo.tserver.Tablet.KVEntry;
 +import org.apache.accumulo.tserver.Tablet.LookupResult;
 +import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
 +import org.apache.accumulo.tserver.Tablet.ScanBatch;
 +import org.apache.accumulo.tserver.Tablet.Scanner;
 +import org.apache.accumulo.tserver.Tablet.SplitInfo;
 +import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
 +import org.apache.accumulo.tserver.Tablet.TabletClosedException;
 +import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
 +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
 +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 +import org.apache.accumulo.tserver.data.ServerConditionalMutation;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.LogSorter;
 +import org.apache.accumulo.tserver.log.MutationReceiver;
 +import org.apache.accumulo.tserver.log.TabletServerLogger;
 +import org.apache.accumulo.tserver.mastermessage.MasterMessage;
 +import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
 +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
 +import org.apache.accumulo.tserver.metrics.TabletServerMBean;
 +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
 +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 +import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 +import org.apache.commons.collections.map.LRUMap;
 +import org.apache.hadoop.fs.FSError;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.server.TServer;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +
 +import com.google.common.net.HostAndPort;
 +
 +enum ScanRunState {
 +  QUEUED, RUNNING, FINISHED
 +}
 +
 +public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.tserver.metrics.TabletServerMBean {
 +  private static final Logger log = Logger.getLogger(TabletServer.class);
 +
 +  private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
 +  private static long lastMemorySize = 0;
 +  private static long gcTimeIncreasedCount;
 +
 +  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
 +  private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
 +
 +  private TabletServerLogger logger;
 +
 +  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
 +
 +  private ServerConfiguration serverConfig;
 +  private LogSorter logSorter = null;
 +
 +  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
 +    super();
 +    this.serverConfig = conf;
 +    this.instance = conf.getInstance();
 +    this.fs = fs;
 +    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error(ex, ex);
 +            }
 +        }
 +      }
 +    }, 5000, 5000);
 +  }
 +
 +  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 +    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 +    Runtime rt = Runtime.getRuntime();
 +
 +    StringBuilder sb = new StringBuilder("gc");
 +
 +    boolean sawChange = false;
 +
 +    long maxIncreaseInCollectionTime = 0;
 +
 +    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 +      Long prevTime = prevGcTime.get(gcBean.getName());
 +      long pt = 0;
 +      if (prevTime != null) {
 +        pt = prevTime;
 +      }
 +
 +      long time = gcBean.getCollectionTime();
 +
 +      if (time - pt != 0) {
 +        sawChange = true;
 +      }
 +
 +      long increaseInCollectionTime = time - pt;
 +      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
 +      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
 +      prevGcTime.put(gcBean.getName(), time);
 +    }
 +
 +    long mem = rt.freeMemory();
 +    if (maxIncreaseInCollectionTime == 0) {
 +      gcTimeIncreasedCount = 0;
 +    } else {
 +      gcTimeIncreasedCount++;
 +      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 +        log.warn("Running low on memory");
 +        gcTimeIncreasedCount = 0;
 +      }
 +    }
 +
 +    if (mem > lastMemorySize) {
 +      sawChange = true;
 +    }
 +
 +    String sign = "+";
 +    if (mem - lastMemorySize <= 0) {
 +      sign = "";
 +    }
 +
 +    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
 +
 +    if (sawChange) {
 +      log.debug(sb.toString());
 +    }
 +
 +    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 +    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 +      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
 +    }
 +
 +    lastMemorySize = mem;
 +  }
 +
 +  private TabletStatsKeeper statsKeeper;
 +
 +  private static class Session {
 +    long lastAccessTime;
 +    long startTime;
 +    String user;
 +    String client = TServerUtils.clientAddress.get();
 +    public boolean reserved;
 +
 +    public void cleanup() {}
 +  }
 +
 +  private static class SessionManager {
 +
 +    SecureRandom random;
 +    Map<Long,Session> sessions;
 +    long maxIdle;
 +
 +    SessionManager(AccumuloConfiguration conf) {
 +      random = new SecureRandom();
 +      sessions = new HashMap<Long,Session>();
 +
 +      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 +
 +      Runnable r = new Runnable() {
 +        @Override
 +        public void run() {
 +          sweep(maxIdle);
 +        }
 +      };
 +
 +      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 +    }
 +
 +    synchronized long createSession(Session session, boolean reserve) {
 +      long sid = random.nextLong();
 +
 +      while (sessions.containsKey(sid)) {
 +        sid = random.nextLong();
 +      }
 +
 +      sessions.put(sid, session);
 +
 +      session.reserved = reserve;
 +
 +      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 +
 +      return sid;
 +    }
 +
 +    long getMaxIdleTime() {
 +      return maxIdle;
 +    }
 +
 +    /**
 +     * while a session is reserved, it cannot be canceled or removed
 +     * 
 +     * @param sessionId
 +     */
 +
 +    synchronized Session reserveSession(long sessionId) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null) {
 +        if (session.reserved)
 +          throw new IllegalStateException();
 +        session.reserved = true;
 +      }
 +
 +      return session;
 +
 +    }
 +
 +    synchronized Session reserveSession(long sessionId, boolean wait) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null) {
 +        while (wait && session.reserved) {
 +          try {
 +            wait(1000);
 +          } catch (InterruptedException e) {
 +            throw new RuntimeException();
 +          }
 +        }
 +
 +        if (session.reserved)
 +          throw new IllegalStateException();
 +        session.reserved = true;
 +      }
 +
 +      return session;
 +
 +    }
 +
 +    synchronized void unreserveSession(Session session) {
 +      if (!session.reserved)
 +        throw new IllegalStateException();
 +      notifyAll();
 +      session.reserved = false;
 +      session.lastAccessTime = System.currentTimeMillis();
 +    }
 +
 +    synchronized void unreserveSession(long sessionId) {
 +      Session session = getSession(sessionId);
 +      if (session != null)
 +        unreserveSession(session);
 +    }
 +
 +    synchronized Session getSession(long sessionId) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null)
 +        session.lastAccessTime = System.currentTimeMillis();
 +      return session;
 +    }
 +
 +    Session removeSession(long sessionId) {
 +      return removeSession(sessionId, false);
 +    }
 +
 +    Session removeSession(long sessionId, boolean unreserve) {
 +      Session session = null;
 +      synchronized (this) {
 +        session = sessions.remove(sessionId);
 +        if (unreserve && session != null)
 +          unreserveSession(session);
 +      }
 +
 +      // do clean up out side of lock..
 +      if (session != null)
 +        session.cleanup();
 +
 +      return session;
 +    }
 +
 +    private void sweep(long maxIdle) {
 +      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 +      synchronized (this) {
 +        Iterator<Session> iter = sessions.values().iterator();
 +        while (iter.hasNext()) {
 +          Session session = iter.next();
 +          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 +          if (idleTime > maxIdle && !session.reserved) {
 +            iter.remove();
 +            sessionsToCleanup.add(session);
 +          }
 +        }
 +      }
 +
 +      // do clean up outside of lock
 +      for (Session session : sessionsToCleanup) {
 +        session.cleanup();
 +      }
 +    }
 +
 +    synchronized void removeIfNotAccessed(final long sessionId, long delay) {
 +      Session session = sessions.get(sessionId);
 +      if (session != null) {
 +        final long removeTime = session.lastAccessTime;
 +        TimerTask r = new TimerTask() {
 +          @Override
 +          public void run() {
 +            Session sessionToCleanup = null;
 +            synchronized (SessionManager.this) {
 +              Session session2 = sessions.get(sessionId);
 +              if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
 +                sessions.remove(sessionId);
 +                sessionToCleanup = session2;
 +              }
 +            }
 +
 +            // call clean up outside of lock
 +            if (sessionToCleanup != null)
 +              sessionToCleanup.cleanup();
 +          }
 +        };
 +
 +        SimpleTimer.getInstance().schedule(r, delay);
 +      }
 +    }
 +
 +    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
 +      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
 +      for (Entry<Long,Session> entry : sessions.entrySet()) {
 +
 +        Session session = entry.getValue();
 +        @SuppressWarnings("rawtypes")
 +        ScanTask nbt = null;
 +        String tableID = null;
 +
 +        if (session instanceof ScanSession) {
 +          ScanSession ss = (ScanSession) session;
 +          nbt = ss.nextBatchTask;
 +          tableID = ss.extent.getTableId().toString();
 +        } else if (session instanceof MultiScanSession) {
 +          MultiScanSession mss = (MultiScanSession) session;
 +          nbt = mss.lookupTask;
 +          tableID = mss.threadPoolExtent.getTableId().toString();
 +        }
 +
 +        if (nbt == null)
 +          continue;
 +
 +        ScanRunState srs = nbt.getScanRunState();
 +
 +        if (nbt == null || srs == ScanRunState.FINISHED)
 +          continue;
 +
 +        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 +        if (stateCounts == null) {
 +          stateCounts = new MapCounter<ScanRunState>();
 +          counts.put(tableID, stateCounts);
 +        }
 +
 +        stateCounts.increment(srs, 1);
 +      }
 +
 +      return counts;
 +    }
 +
 +    public synchronized List<ActiveScan> getActiveScans() {
 +
 +      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 +
 +      long ct = System.currentTimeMillis();
 +
 +      for (Entry<Long,Session> entry : sessions.entrySet()) {
 +        Session session = entry.getValue();
 +        if (session instanceof ScanSession) {
 +          ScanSession ss = (ScanSession) session;
 +
 +          ScanState state = ScanState.RUNNING;
 +
 +          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 +          if (nbt == null) {
 +            state = ScanState.IDLE;
 +          } else {
 +            switch (nbt.getScanRunState()) {
 +              case QUEUED:
 +                state = ScanState.QUEUED;
 +                break;
 +              case FINISHED:
 +                state = ScanState.IDLE;
 +                break;
 +              case RUNNING:
 +              default:
 +                /* do nothing */
 +                break;
 +            }
 +          }
 +
 +          activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
 +              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
 +
 +        } else if (session instanceof MultiScanSession) {
 +          MultiScanSession mss = (MultiScanSession) session;
 +
 +          ScanState state = ScanState.RUNNING;
 +
 +          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 +          if (nbt == null) {
 +            state = ScanState.IDLE;
 +          } else {
 +            switch (nbt.getScanRunState()) {
 +              case QUEUED:
 +                state = ScanState.QUEUED;
 +                break;
 +              case FINISHED:
 +                state = ScanState.IDLE;
 +                break;
 +              case RUNNING:
 +              default:
 +                /* do nothing */
 +                break;
 +            }
 +          }
 +
 +          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
 +              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
 +                  .getAuthorizationsBB()));
 +        }
 +      }
 +
 +      return activeScans;
 +    }
 +  }
 +
 +  static class TservConstraintEnv implements Environment {
 +
 +    private TCredentials credentials;
 +    private SecurityOperation security;
 +    private Authorizations auths;
 +    private KeyExtent ke;
 +
 +    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 +      this.security = secOp;
 +      this.credentials = credentials;
 +    }
 +
 +    void setExtent(KeyExtent ke) {
 +      this.ke = ke;
 +    }
 +
 +    @Override
 +    public KeyExtent getExtent() {
 +      return ke;
 +    }
 +
 +    @Override
 +    public String getUser() {
 +      return credentials.getPrincipal();
 +    }
 +
 +    @Override
 +    @Deprecated
 +    public Authorizations getAuthorizations() {
 +      if (auths == null)
 +        try {
 +          this.auths = security.getUserAuthorizations(credentials);
 +        } catch (ThriftSecurityException e) {
 +          throw new RuntimeException(e);
 +        }
 +      return auths;
 +    }
 +
 +    @Override
 +    public AuthorizationContainer getAuthorizationsContainer() {
 +      return new AuthorizationContainer() {
 +        @Override
 +        public boolean contains(ByteSequence auth) {
 +          try {
 +            return security.userHasAuthorizations(credentials,
 +                Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length())));
 +          } catch (ThriftSecurityException e) {
 +            throw new RuntimeException(e);
 +          }
 +        }
 +      };
 +    }
 +  }
 +
 +  private abstract class ScanTask<T> implements RunnableFuture<T> {
 +
 +    protected AtomicBoolean interruptFlag;
 +    protected ArrayBlockingQueue<Object> resultQueue;
 +    protected AtomicInteger state;
 +    protected AtomicReference<ScanRunState> runState;
 +
 +    private static final int INITIAL = 1;
 +    private static final int ADDED = 2;
 +    private static final int CANCELED = 3;
 +
 +    ScanTask() {
 +      interruptFlag = new AtomicBoolean(false);
 +      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 +      state = new AtomicInteger(INITIAL);
 +      resultQueue = new ArrayBlockingQueue<Object>(1);
 +    }
 +
 +    protected void addResult(Object o) {
 +      if (state.compareAndSet(INITIAL, ADDED))
 +        resultQueue.add(o);
 +      else if (state.get() == ADDED)
 +        throw new IllegalStateException("Tried to add more than one result");
 +    }
 +
 +    @Override
 +    public boolean cancel(boolean mayInterruptIfRunning) {
 +      if (!mayInterruptIfRunning)
 +        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
 +
 +      if (state.get() == CANCELED)
 +        return true;
 +
 +      if (state.compareAndSet(INITIAL, CANCELED)) {
 +        interruptFlag.set(true);
 +        resultQueue = null;
 +        return true;
 +      }
 +
 +      return false;
 +    }
 +
 +    @Override
 +    public T get() throws InterruptedException, ExecutionException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    @Override
 +    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 +
 +      ArrayBlockingQueue<Object> localRQ = resultQueue;
 +
 +      if (state.get() == CANCELED)
 +        throw new CancellationException();
 +
 +      if (localRQ == null && state.get() == ADDED)
 +        throw new IllegalStateException("Tried to get result twice");
 +
 +      Object r = localRQ.poll(timeout, unit);
 +
 +      // could have been canceled while waiting
 +      if (state.get() == CANCELED) {
 +        if (r != null)
 +          throw new IllegalStateException("Nothing should have been added when in canceled state");
 +
 +        throw new CancellationException();
 +      }
 +
 +      if (r == null)
 +        throw new TimeoutException();
 +
 +      // make this method stop working now that something is being
 +      // returned
 +      resultQueue = null;
 +
 +      if (r instanceof Throwable)
 +        throw new ExecutionException((Throwable) r);
 +
 +      return (T) r;
 +    }
 +
 +    @Override
 +    public boolean isCancelled() {
 +      return state.get() == CANCELED;
 +    }
 +
 +    @Override
 +    public boolean isDone() {
 +      return runState.get().equals(ScanRunState.FINISHED);
 +    }
 +
 +    public ScanRunState getScanRunState() {
 +      return runState.get();
 +    }
 +
 +  }
 +
 +  private static class ConditionalSession extends Session {
 +    public TCredentials credentials;
 +    public Authorizations auths;
 +    public String tableId;
 +    public AtomicBoolean interruptFlag;
 +
 +    @Override
 +    public void cleanup() {
 +      interruptFlag.set(true);
 +    }
 +  }
 +
 +  private static class UpdateSession extends Session {
 +    public Tablet currentTablet;
 +    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
 +    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
 +    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
 +    public Violations violations;
 +    public TCredentials credentials;
 +    public long totalUpdates = 0;
 +    public long flushTime = 0;
 +    Stat prepareTimes = new Stat();
 +    Stat walogTimes = new Stat();
 +    Stat commitTimes = new Stat();
 +    Stat authTimes = new Stat();
 +    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
 +    public long queuedMutationSize = 0;
 +    TservConstraintEnv cenv = null;
 +  }
 +
 +  private static class ScanSession extends Session {
 +    public KeyExtent extent;
 +    public HashSet<Column> columnSet;
 +    public List<IterInfo> ssiList;
 +    public Map<String,Map<String,String>> ssio;
 +    public Authorizations auths;
 +    public long entriesReturned = 0;
 +    public Stat nbTimes = new Stat();
 +    public long batchCount = 0;
 +    public volatile ScanTask<ScanBatch> nextBatchTask;
 +    public AtomicBoolean interruptFlag;
 +    public Scanner scanner;
 +    public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 +
 +    @Override
 +    public void cleanup() {
 +      try {
 +        if (nextBatchTask != null)
 +          nextBatchTask.cancel(true);
 +      } finally {
 +        if (scanner != null)
 +          scanner.close();
 +      }
 +    }
 +
 +  }
 +
 +  private static class MultiScanSession extends Session {
 +    HashSet<Column> columnSet;
 +    Map<KeyExtent,List<Range>> queries;
 +    public List<IterInfo> ssiList;
 +    public Map<String,Map<String,String>> ssio;
 +    public Authorizations auths;
 +
 +    // stats
 +    int numRanges;
 +    int numTablets;
 +    int numEntries;
 +    long totalLookupTime;
 +
 +    public volatile ScanTask<MultiScanResult> lookupTask;
 +    public KeyExtent threadPoolExtent;
 +
 +    @Override
 +    public void cleanup() {
 +      if (lookupTask != null)
 +        lookupTask.cancel(true);
 +    }
 +  }
 +
 +  /**
 +   * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
 +   * are monotonically increasing.
 +   * 
 +   */
 +  static class WriteTracker {
 +    private static AtomicLong operationCounter = new AtomicLong(1);
 +    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
 +
 +    WriteTracker() {
 +      for (TabletType ttype : TabletType.values()) {
 +        inProgressWrites.put(ttype, new TreeSet<Long>());
 +      }
 +    }
 +
 +    synchronized long startWrite(TabletType ttype) {
 +      long operationId = operationCounter.getAndIncrement();
 +      inProgressWrites.get(ttype).add(operationId);
 +      return operationId;
 +    }
 +
 +    synchronized void finishWrite(long operationId) {
 +      if (operationId == -1)
 +        return;
 +
 +      boolean removed = false;
 +
 +      for (TabletType ttype : TabletType.values()) {
 +        removed = inProgressWrites.get(ttype).remove(operationId);
 +        if (removed)
 +          break;
 +      }
 +
 +      if (!removed) {
 +        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId " + operationId);
 +      }
 +
 +      this.notifyAll();
 +    }
 +
 +    synchronized void waitForWrites(TabletType ttype) {
 +      long operationId = operationCounter.getAndIncrement();
 +      while (inProgressWrites.get(ttype).floor(operationId) != null) {
 +        try {
 +          this.wait();
 +        } catch (InterruptedException e) {
 +          log.error(e, e);
 +        }
 +      }
 +    }
 +
 +    public long startWrite(Set<Tablet> keySet) {
 +      if (keySet.size() == 0)
 +        return -1;
 +
 +      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
 +
 +      for (Tablet tablet : keySet)
 +        extents.add(tablet.getExtent());
 +
 +      return startWrite(TabletType.type(extents));
 +    }
 +  }
 +
 +  public AccumuloConfiguration getSystemConfiguration() {
 +    return serverConfig.getConfiguration();
 +  }
 +
 +  TransactionWatcher watcher = new TransactionWatcher();
 +
 +  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
 +
 +    SessionManager sessionManager;
 +
 +    AccumuloConfiguration acuConf = getSystemConfiguration();
 +
 +    TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
 +
 +    TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
 +
 +    WriteTracker writeTracker = new WriteTracker();
 +
 +    private RowLocks rowLocks = new RowLocks();
 +
 +    ThriftClientHandler() {
 +      super(instance, watcher, fs);
 +      log.debug(ThriftClientHandler.class.getName() + " created");
 +      sessionManager = new SessionManager(getSystemConfiguration());
 +      // Register the metrics MBean
 +      try {
 +        updateMetrics.register();
 +        scanMetrics.register();
 +      } catch (Exception e) {
 +        log.error("Exception registering MBean with MBean Server", e);
 +      }
 +    }
 +
 +    @Override
 +    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
 +        throws ThriftSecurityException {
 +
 +      if (!security.canPerformSystemActions(credentials))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +
 +      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
 +
 +      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
 +        TKeyExtent tke = entry.getKey();
 +        Map<String,MapFileInfo> fileMap = entry.getValue();
 +        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
 +        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
 +          Path path = new Path(mapping.getKey());
 +          FileSystem ns = fs.getFileSystemByPath(path);
 +          path = ns.makeQualified(path);
 +          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
 +        }
 +
 +        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
 +
 +        if (importTablet == null) {
 +          failures.add(tke);
 +        } else {
 +          try {
 +            importTablet.importMapFiles(tid, fileRefMap, setTime);
 +          } catch (IOException ioe) {
 +            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
 +            failures.add(tke);
 +          }
 +        }
 +      }
 +      return failures;
 +    }
 +
 +    private class NextBatchTask extends ScanTask<ScanBatch> {
 +
 +      private long scanID;
 +
 +      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
 +        this.scanID = scanID;
 +        this.interruptFlag = interruptFlag;
 +
 +        if (interruptFlag.get())
 +          cancel(true);
 +      }
 +
 +      @Override
 +      public void run() {
 +
 +        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
 +        String oldThreadName = Thread.currentThread().getName();
 +
 +        try {
 +          if (isCancelled() || scanSession == null)
 +            return;
 +
 +          runState.set(ScanRunState.RUNNING);
 +
 +          Thread.currentThread().setName(
 +              "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
 +
 +          Tablet tablet = onlineTablets.get(scanSession.extent);
 +
 +          if (tablet == null) {
 +            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 +            return;
 +          }
 +
 +          long t1 = System.currentTimeMillis();
 +          ScanBatch batch = scanSession.scanner.read();
 +          long t2 = System.currentTimeMillis();
 +          scanSession.nbTimes.addStat(t2 - t1);
 +
 +          // there should only be one thing on the queue at a time, so
 +          // it should be ok to call add()
 +          // instead of put()... if add() fails because queue is at
 +          // capacity it means there is code
 +          // problem somewhere
 +          addResult(batch);
 +        } catch (TabletClosedException e) {
 +          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
 +        } catch (IterationInterruptedException iie) {
 +          if (!isCancelled()) {
 +            log.warn("Iteration interrupted, when scan not cancelled", iie);
 +            addResult(iie);
 +          }
 +        } catch (TooManyFilesException tmfe) {
 +          addResult(tmfe);
 +        } catch (Throwable e) {
 +          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
 +          addResult(e);
 +        } finally {
 +          runState.set(ScanRunState.FINISHED);
 +          Thread.currentThread().setName(oldThreadName);
 +        }
 +
 +      }
 +    }
 +
 +    private class LookupTask extends ScanTask<MultiScanResult> {
 +
 +      private long scanID;
 +
 +      LookupTask(long scanID) {
 +        this.scanID = scanID;
 +      }
 +
 +      @Override
 +      public void run() {
 +        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
 +        String oldThreadName = Thread.currentThread().getName();
 +
 +        try {
 +          if (isCancelled() || session == null)
 +            return;
 +
 +          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
 +          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
 +
 +          runState.set(ScanRunState.RUNNING);
 +          Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
 +
 +          long bytesAdded = 0;
 +          long maxScanTime = 4000;
 +
 +          long startTime = System.currentTimeMillis();
 +
 +          ArrayList<KVEntry> results = new ArrayList<KVEntry>();
 +          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
 +          ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
 +          KeyExtent partScan = null;
 +          Key partNextKey = null;
 +          boolean partNextKeyInclusive = false;
 +
 +          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
 +
 +          // check the time so that the read ahead thread is not monopolized
 +          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
 +            Entry<KeyExtent,List<Range>> entry = iter.next();
 +
 +            iter.remove();
 +
 +            // check that tablet server is serving requested tablet
 +            Tablet tablet = onlineTablets.get(entry.getKey());
 +            if (tablet == null) {
 +              failures.put(entry.getKey(), entry.getValue());
 +              continue;
 +            }
 +            Thread.currentThread().setName(
 +                "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
 +
 +            LookupResult lookupResult;
 +            try {
 +
 +              // do the following check to avoid a race condition
 +              // between setting false below and the task being
 +              // canceled
 +              if (isCancelled())
 +                interruptFlag.set(true);
 +
 +              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
 +                  session.ssio, interruptFlag);
 +
 +              // if the tablet was closed it it possible that the
 +              // interrupt flag was set.... do not want it set for
 +              // the next
 +              // lookup
 +              interruptFlag.set(false);
 +
 +            } catch (IOException e) {
 +              log.warn("lookup failed for tablet " + entry.getKey(), e);
 +              throw new RuntimeException(e);
 +            }
 +
 +            bytesAdded += lookupResult.bytesAdded;
 +
 +            if (lookupResult.unfinishedRanges.size() > 0) {
 +              if (lookupResult.closed) {
 +                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
 +              } else {
 +                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
 +                partScan = entry.getKey();
 +                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
 +                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
 +              }
 +            } else {
 +              fullScans.add(entry.getKey());
 +            }
 +          }
 +
 +          long finishTime = System.currentTimeMillis();
 +          session.totalLookupTime += (finishTime - startTime);
 +          session.numEntries += results.size();
 +
 +          // convert everything to thrift before adding result
 +          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
 +          for (KVEntry entry : results)
 +            retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
 +          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
 +          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
 +          TKeyExtent retPartScan = null;
 +          TKey retPartNextKey = null;
 +          if (partScan != null) {
 +            retPartScan = partScan.toThrift();
 +            retPartNextKey = partNextKey.toThrift();
 +          }
 +          // add results to queue
 +          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
 +        } catch (IterationInterruptedException iie) {
 +          if (!isCancelled()) {
 +            log.warn("Iteration interrupted, when scan not cancelled", iie);
 +            addResult(iie);
 +          }
 +        } catch (Throwable e) {
 +          log.warn("exception while doing multi-scan ", e);
 +          addResult(e);
 +        } finally {
 +          Thread.currentThread().setName(oldThreadName);
 +          runState.set(ScanRunState.FINISHED);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
 +        long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +
 +      if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +
 +      if (!security.userHasAuthorizations(credentials, authorizations))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 +
 +      KeyExtent extent = new KeyExtent(textent);
 +
 +      // wait for any writes that are in flight.. this done to ensure
 +      // consistency across client restarts... assume a client writes
 +      // to accumulo and dies while waiting for a confirmation from
 +      // accumulo... the client process restarts and tries to read
 +      // data from accumulo making the assumption that it will get
 +      // any writes previously made... however if the server side thread
 +      // processing the write from the dead client is still in progress,
 +      // the restarted client may not see the write unless we wait here.
 +      // this behavior is very important when the client is reading the
 +      // metadata
 +      if (waitForWrites)
 +        writeTracker.waitForWrites(TabletType.type(extent));
 +
 +      Tablet tablet = onlineTablets.get(extent);
 +      if (tablet == null)
 +        throw new NotServingTabletException(textent);
 +
 +      ScanSession scanSession = new ScanSession();
 +      scanSession.user = credentials.getPrincipal();
 +      scanSession.extent = new KeyExtent(extent);
 +      scanSession.columnSet = new HashSet<Column>();
 +      scanSession.ssiList = ssiList;
 +      scanSession.ssio = ssio;
 +      scanSession.auths = new Authorizations(authorizations);
 +      scanSession.interruptFlag = new AtomicBoolean();
 +      scanSession.readaheadThreshold = readaheadThreshold;
 +
 +      for (TColumn tcolumn : columns) {
 +        scanSession.columnSet.add(new Column(tcolumn));
 +      }
 +
 +      scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
 +          scanSession.interruptFlag);
 +
 +      long sid = sessionManager.createSession(scanSession, true);
 +
 +      ScanResult scanResult;
 +      try {
 +        scanResult = continueScan(tinfo, sid, scanSession);
 +      } catch (NoSuchScanIDException e) {
 +        log.error("The impossible happened", e);
 +        throw new RuntimeException();
 +      } finally {
 +        sessionManager.unreserveSession(sid);
 +      }
 +
 +      return new InitialScan(sid, scanResult);
 +    }
 +
 +    @Override
 +    public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
 +        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
 +      if (scanSession == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +
 +      try {
 +        return continueScan(tinfo, scanID, scanSession);
 +      } finally {
 +        sessionManager.unreserveSession(scanSession);
 +      }
 +    }
 +
 +    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
 +        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 +
 +      if (scanSession.nextBatchTask == null) {
 +        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
 +        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
 +      }
 +
 +      ScanBatch bresult;
 +      try {
 +        bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
 +        scanSession.nextBatchTask = null;
 +      } catch (ExecutionException e) {
 +        sessionManager.removeSession(scanID);
 +        if (e.getCause() instanceof NotServingTabletException)
 +          throw (NotServingTabletException) e.getCause();
 +        else if (e.getCause() instanceof TooManyFilesException)
 +          throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
 +        else
 +          throw new RuntimeException(e);
 +      } catch (CancellationException ce) {
 +        sessionManager.removeSession(scanID);
 +        Tablet tablet = onlineTablets.get(scanSession.extent);
 +        if (tablet == null || tablet.isClosed())
 +          throw new NotServingTabletException(scanSession.extent.toThrift());
 +        else
 +          throw new NoSuchScanIDException();
 +      } catch (TimeoutException e) {
 +        List<TKeyValue> param = Collections.emptyList();
 +        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
 +        sessionManager.removeIfNotAccessed(scanID, timeout);
 +        return new ScanResult(param, true);
 +      } catch (Throwable t) {
 +        sessionManager.removeSession(scanID);
 +        log.warn("Failed to get next batch", t);
 +        throw new RuntimeException(t);
 +      }
 +
 +      ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
 +
 +      scanSession.entriesReturned += scanResult.results.size();
 +
 +      scanSession.batchCount++;
 +
 +      if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
 +        // start reading next batch while current batch is transmitted
 +        // to client
 +        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
 +        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
 +      }
 +
 +      if (!scanResult.more)
 +        closeScan(tinfo, scanID);
 +
 +      return scanResult;
 +    }
 +
 +    @Override
 +    public void closeScan(TInfo tinfo, long scanID) {
 +      ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
 +      if (ss != null) {
 +        long t2 = System.currentTimeMillis();
 +
 +        log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
 +            .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
 +        if (scanMetrics.isEnabled()) {
 +          scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
 +          scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
 +        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
 +      // find all of the tables that need to be scanned
 +      HashSet<String> tables = new HashSet<String>();
 +      for (TKeyExtent keyExtent : tbatch.keySet()) {
 +        tables.add(new String(keyExtent.getTable()));
 +      }
 +
 +      if (tables.size() != 1)
 +        throw new IllegalArgumentException("Cannot batch scan over multiple tables");
 +
 +      // check if user has permission to the tables
 +      for (String table : tables)
 +        if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +
 +      try {
 +        if (!security.userHasAuthorizations(credentials, authorizations))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 +      } catch (ThriftSecurityException tse) {
 +        log.error(tse, tse);
 +        throw tse;
 +      }
 +      Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator<TRange,Range>(
 +          new TRangeTranslator()));
 +
 +      // This is used to determine which thread pool to use
 +      KeyExtent threadPoolExtent = batch.keySet().iterator().next();
 +
 +      if (waitForWrites)
 +        writeTracker.waitForWrites(TabletType.type(batch.keySet()));
 +
 +      MultiScanSession mss = new MultiScanSession();
 +      mss.user = credentials.getPrincipal();
 +      mss.queries = batch;
 +      mss.columnSet = new HashSet<Column>(tcolumns.size());
 +      mss.ssiList = ssiList;
 +      mss.ssio = ssio;
 +      mss.auths = new Authorizations(authorizations);
 +
 +      mss.numTablets = batch.size();
 +      for (List<Range> ranges : batch.values()) {
 +        mss.numRanges += ranges.size();
 +      }
 +
 +      for (TColumn tcolumn : tcolumns)
 +        mss.columnSet.add(new Column(tcolumn));
 +
 +      mss.threadPoolExtent = threadPoolExtent;
 +
 +      long sid = sessionManager.createSession(mss, true);
 +
 +      MultiScanResult result;
 +      try {
 +        result = continueMultiScan(tinfo, sid, mss);
 +      } catch (NoSuchScanIDException e) {
 +        log.error("the impossible happened", e);
 +        throw new RuntimeException("the impossible happened", e);
 +      } finally {
 +        sessionManager.unreserveSession(sid);
 +      }
 +
 +      return new InitialMultiScan(sid, result);
 +    }
 +
 +    @Override
 +    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
 +
 +      MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
 +
 +      if (session == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +
 +      try {
 +        return continueMultiScan(tinfo, scanID, session);
 +      } finally {
 +        sessionManager.unreserveSession(session);
 +      }
 +    }
 +
 +    private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
 +
 +      if (session.lookupTask == null) {
 +        session.lookupTask = new LookupTask(scanID);
 +        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
 +      }
 +
 +      try {
 +        MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
 +        session.lookupTask = null;
 +        return scanResult;
 +      } catch (TimeoutException e1) {
 +        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
 +        sessionManager.removeIfNotAccessed(scanID, timeout);
 +        List<TKeyValue> results = Collections.emptyList();
 +        Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
 +        List<TKeyExtent> fullScans = Collections.emptyList();
 +        return new MultiScanResult(results, failures, fullScans, null, null, false, true);
 +      } catch (Throwable t) {
 +        sessionManager.removeSession(scanID);
 +        log.warn("Failed to get multiscan result", t);
 +        throw new RuntimeException(t);
 +      }
 +    }
 +
 +    @Override
 +    public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
 +      MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
 +      if (session == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +
 +      long t2 = System.currentTimeMillis();
 +      log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
 +          session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
 +    }
 +
 +    @Override
 +    public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
 +      // Make sure user is real
 +
 +      security.authenticateUser(credentials, credentials);
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +
 +      UpdateSession us = new UpdateSession();
 +      us.violations = new Violations();
 +      us.credentials = credentials;
 +      us.cenv = new TservConstraintEnv(security, us.credentials);
 +
 +      long sid = sessionManager.createSession(us, false);
 +
 +      return sid;
 +    }
 +
 +    private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
 +      long t1 = System.currentTimeMillis();
 +      if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
 +        return;
 +      if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
 +        // if there were previous failures, then do not accept additional writes
 +        return;
 +      }
 +
 +      try {
 +        // if user has no permission to write to this table, add it to
 +        // the failures list
 +        boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
 +        if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
 +          long t2 = System.currentTimeMillis();
 +          us.authTimes.addStat(t2 - t1);
 +          us.currentTablet = onlineTablets.get(keyExtent);
 +          if (us.currentTablet != null) {
 +            us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
 +          } else {
 +            // not serving tablet, so report all mutations as
 +            // failures
 +            us.failures.put(keyExtent, 0l);
 +            if (updateMetrics.isEnabled())
 +              updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
 +          }
 +        } else {
 +          log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
 +          long t2 = System.currentTimeMillis();
 +          us.authTimes.addStat(t2 - t1);
 +          us.currentTablet = null;
 +          us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
 +          if (updateMetrics.isEnabled())
 +            updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +          return;
 +        }
 +      } catch (ThriftSecurityException e) {
 +        log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
 +        long t2 = System.currentTimeMillis();
 +        us.authTimes.addStat(t2 - t1);
 +        us.currentTablet = null;
 +        us.authFailures.put(keyExtent, e.getCode());
 +        if (updateMetrics.isEnabled())
 +          updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 +        return;
 +      }
 +    }
 +
 +    @Override
 +    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
 +      UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
 +      if (us == null) {
 +        throw new RuntimeException("No Such SessionID");
 +      }
 +
 +      try {
 +        KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +        setUpdateTablet(us, keyExtent);
 +
 +        if (us.currentTablet != null) {
 +          List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
 +          for (TMutation tmutation : tmutations) {
 +            Mutation mutation = new ServerMutation(tmutation);
 +            mutations.add(mutation);
 +            us.queuedMutationSize += mutation.numBytes();
 +          }
 +          if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
 +            flush(us);
 +        }
 +      } finally {
 +        sessionManager.unreserveSession(us);
 +      }
 +    }
 +
 +    private void flush(UpdateSession us) {
 +
 +      int mutationCount = 0;
 +      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
 +      Throwable error = null;
 +
 +      long pt1 = System.currentTimeMillis();
 +
 +      boolean containsMetadataTablet = false;
 +      for (Tablet tablet : us.queuedMutations.keySet())
 +        if (tablet.getExtent().isMeta())
 +          containsMetadataTablet = true;
 +
 +      if (!containsMetadataTablet && us.queuedMutations.size() > 0)
 +        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
 +
 +      Span prep = Trace.start("prep");
 +      try {
 +        for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
 +
 +          Tablet tablet = entry.getKey();
 +          List<Mutation> mutations = entry.getValue();
 +          if (mutations.size() > 0) {
 +            try {
 +              if (updateMetrics.isEnabled())
 +                updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
 +
 +              CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
 +              if (commitSession == null) {
 +                if (us.currentTablet == tablet) {
 +                  us.currentTablet = null;
 +                }
 +                us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
 +              } else {
 +                sendables.put(commitSession, mutations);
 +                mutationCount += mutations.size();
 +              }
 +
 +            } catch (TConstraintViolationException e) {
 +              us.violations.add(e.getViolations());
 +              if (updateMetrics.isEnabled())
 +                updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
 +
 +              if (e.getNonViolators().size() > 0) {
 +                // only log and commit mutations if there were some
 +                // that did not
 +                // violate constraints... this is what
 +                // prepareMutationsForCommit()
 +                // expects
 +                sendables.put(e.getCommitSession(), e.getNonViolators());
 +              }
 +
 +              mutationCount += mutations.size();
 +
 +            } catch (HoldTimeoutException t) {
 +              error = t;
 +              log.debug("Giving up on mutations due to a long memory hold time");
 +              break;
 +            } catch (Throwable t) {
 +              error = t;
 +              log.error("Unexpected error preparing for commit", error);
 +              break;
 +            }
 +          }
 +        }
 +      } finally {
 +        prep.stop();
 +      }
 +
 +      long pt2 = System.currentTimeMillis();
 +      us.prepareTimes.addStat(pt2 - pt1);
 +      updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
 +
 +      if (error != null) {
 +        for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
 +          e.getKey().abortCommit(e.getValue());
 +        }
 +        throw new RuntimeException(error);
 +      }
 +      try {
 +        Span wal = Trace.start("wal");
 +        try {
 +          while (true) {
 +            try {
 +              long t1 = System.currentTimeMillis();
 +
 +              logger.logManyTablets(sendables);
 +
 +              long t2 = System.currentTimeMillis();
 +              us.walogTimes.addStat(t2 - t1);
 +              updateWalogWriteTime((t2 - t1));
 +              break;
 +            } catch (IOException ex) {
 +              log.warn("logging mutations failed, retrying");
 +            } catch (FSError ex) { // happens when DFS is localFS
 +              log.warn("logging mutations failed, retrying");
 +            } catch (Throwable t) {
 +              log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
 +              throw new RuntimeException(t);
 +            }
 +          }
 +        } finally {
 +          wal.stop();
 +        }
 +
 +        Span commit = Trace.start("commit");
 +        try {
 +          long t1 = System.currentTimeMillis();
 +          for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
 +            CommitSession commitSession = entry.getKey();
 +            List<Mutation> mutations = entry.getValue();
 +
 +            commitSession.commit(mutations);
 +
 +            Tablet tablet = commitSession.getTablet();
 +
 +            if (tablet == us.currentTablet) {
 +              // because constraint violations may filter out some
 +              // mutations, for proper
 +              // accounting with the client code, need to increment
 +              // the count based
 +              // on the original number of mutations from the client
 +              // NOT the filtered number
 +              us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
 +            }
 +          }
 +          long t2 = System.currentTimeMillis();
 +
 +          us.flushTime += (t2 - pt1);
 +          us.commitTimes.addStat(t2 - t1);
 +
 +          updateAvgCommitTime(t2 - t1, sendables.size());
 +        } finally {
 +          commit.stop();
 +        }
 +      } finally {
 +        us.queuedMutations.clear();
 +        if (us.currentTablet != null) {
 +          us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
 +        }
 +        us.queuedMutationSize = 0;
 +      }
 +      us.totalUpdates += mutationCount;
 +    }
 +
 +    private void updateWalogWriteTime(long time) {
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time);
 +    }
 +
 +    private void updateAvgCommitTime(long time, int size) {
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size));
 +    }
 +
 +    private void updateAvgPrepTime(long time, int size) {
 +      if (updateMetrics.isEnabled())
 +        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size));
 +    }
 +
 +    @Override
 +    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
 +      UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
 +      if (us == null) {
 +        throw new NoSuchScanIDException();
 +      }
 +
 +      // clients may or may not see data from an update session while
 +      // it is in progress, however when the update session is closed
 +      // want to ensure that reads wait for the write to finish
 +      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
 +
 +      try {
 +        flush(us);
 +      } finally {
 +        writeTracker.finishWrite(opid);
 +      }
 +
 +      log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
 +          (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
 +          us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
 +      if (us.failures.size() > 0) {
 +        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
 +        log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
 +      }
 +      List<ConstraintViolationSummary> violations = us.violations.asList();
 +      if (violations.size() > 0) {
 +        ConstraintViolationSummary first = us.violations.asList().iterator().next();
 +        log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
 +      }
 +      if (us.authFailures.size() > 0) {
 +        KeyExtent first = us.authFailures.keySet().iterator().next();
 +        log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
 +      }
 +
 +      return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
 +          us.authFailures, Translator.KET));
 +    }
 +
 +    @Override
 +    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
 +        ConstraintViolationException, ThriftSecurityException {
 +
 +      if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +      Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
 +      if (tablet == null) {
 +        throw new NotServingTabletException(tkeyExtent);
 +      }
 +
 +      if (!keyExtent.isMeta())
 +        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
 +
 +      long opid = writeTracker.startWrite(TabletType.type(keyExtent));
 +
 +      try {
 +        Mutation mutation = new ServerMutation(tmutation);
 +        List<Mutation> mutations = Collections.singletonList(mutation);
 +
 +        Span prep = Trace.start("prep");
 +        CommitSession cs;
 +        try {
 +          cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
 +        } finally {
 +          prep.stop();
 +        }
 +        if (cs == null) {
 +          throw new NotServingTabletException(tkeyExtent);
 +        }
 +
 +        while (true) {
 +          try {
 +            Span wal = Trace.start("wal");
 +            try {
 +              logger.log(cs, cs.getWALogSeq(), mutation);
 +            } finally {
 +              wal.stop();
 +            }
 +            break;
 +          } catch (IOException ex) {
 +            log.warn(ex, ex);
 +          }
 +        }
 +
 +        Span commit = Trace.start("commit");
 +        try {
 +          cs.commit(mutations);
 +        } finally {
 +          commit.stop();
 +        }
 +      } catch (TConstraintViolationException e) {
 +        throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
 +      } finally {
 +        writeTracker.finishWrite(opid);
 +      }
 +    }
 +
 +    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
 +        List<String> symbols) throws IOException {
 +      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
 +
 +      CompressedIterators compressedIters = new CompressedIterators(symbols);
 +
 +      while (iter.hasNext()) {
 +        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
 +        Tablet tablet = onlineTablets.get(entry.getKey());
 +
 +        if (tablet == null || tablet.isClosed()) {
 +          for (ServerConditionalMutation scm : entry.getValue())
 +            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +          iter.remove();
 +        } else {
 +          List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
 +
 +          for (ServerConditionalMutation scm : entry.getValue()) {
 +            if (checkCondition(results, cs, compressedIters, tablet, scm))
 +              okMutations.add(scm);
 +          }
 +
 +          entry.setValue(okMutations);
 +        }
 +
 +      }
 +    }
 +
 +    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
 +        ServerConditionalMutation scm) throws IOException {
 +      boolean add = true;
 +
 +      Set<Column> emptyCols = Collections.emptySet();
 +
 +      for (TCondition tc : scm.getConditions()) {
 +
 +        Range range;
 +        if (tc.hasTimestamp)
 +          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
 +        else
 +          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
 +
 +        IterConfig ic = compressedIters.decompress(tc.iterators);
 +
 +        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
 +
 +        try {
 +          ScanBatch batch = scanner.read();
 +
 +          Value val = null;
 +
 +          for (KVEntry entry2 : batch.results) {
 +            val = entry2.getValue();
 +            break;
 +          }
 +
 +          if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
 +            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
 +            add = false;
 +            break;
 +          }
 +
 +        } catch (TabletClosedException e) {
 +          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +          add = false;
 +          break;
 +        } catch (IterationInterruptedException iie) {
 +          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +          add = false;
 +          break;
 +        } catch (TooManyFilesException tmfe) {
 +          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +          add = false;
 +          break;
 +        }
 +      }
 +      return add;
 +    }
 +
 +    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
 +      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
 +
 +      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
 +
 +      boolean sessionCanceled = sess.interruptFlag.get();
 +
 +      Span prepSpan = Trace.start("prep");
 +      try {
 +        long t1 = System.currentTimeMillis();
 +        for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
 +          Tablet tablet = onlineTablets.get(entry.getKey());
 +          if (tablet == null || tablet.isClosed() || sessionCanceled) {
 +            for (ServerConditionalMutation scm : entry.getValue())
 +              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +          } else {
 +            try {
 +
 +              @SuppressWarnings("unchecked")
 +              List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
 +              if (mutations.size() > 0) {
 +
 +                CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
 +
 +                if (cs == null) {
 +                  for (ServerConditionalMutation scm : entry.getValue())
 +                    results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 +                } else {
 +                  for (ServerConditionalMutation scm : entry.getValue())
 +                    results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
 +                  sendables.put(cs, mutations);
 +                }
 +              }
 +            } catch (TConstraintViolationException e) {
 +              if (e.getNonViolators().size() > 0) {
 +                sendables.put(e.getCommitSession(), e.getNonViolators());
 +                for (Mutation m : e.getNonViolators())
 +                  results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
 +              }
 +
 +              for (Mutation m : e.getViolators())
 +                results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
 +            }
 +          }
 +        }
 +
 +        long t2 = System.currentTimeMillis();
 +        updateAvgPrepTime(t2 - t1, es.size());
 +      } finally {
 +        prepSpan.stop();
 +      }
 +
 +      Span walSpan = Trace.start("wal");
 +      try {
 +        while (true && sendables.size() > 0) {
 +          try {
 +            long t1 = System.currentTimeMillis();
 +            logger.logManyTablets(sendables);
 +            long t2 = System.currentTimeMillis();
 +            updateWalogWriteTime(t2 - t1);
 +            break;
 +          } catch (IOException ex) {
 +            log.warn("logging mutations failed, retrying");
 +          } catch (FSError ex) { // happens when DFS is localFS
 +            log.warn("logging mutations failed, retrying");
 +          } catch (Throwable t) {
 +            log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
 +            throw new RuntimeException(t);
 +          }
 +        }
 +      } finally {
 +        walSpan.stop();
 +      }
 +
 +      Span commitSpan = Trace.start("commit");
 +      try {
 +        long t1 = System.currentTimeMillis();
 +        for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
 +          CommitSession commitSession = entry.getKey();
 +          List<Mutation> mutations = entry.getValue();
 +
 +          commitSession.commit(mutations);
 +        }
 +        long t2 = System.currentTimeMillis();
 +        updateAvgCommitTime(t2 - t1, sendables.size());
 +      } finally {
 +        commitSpan.stop();
 +      }
 +
 +    }
 +
 +    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
 +        ArrayList<TCMResult> results, List<String> symbols) throws IOException {
 +      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
 +      ConditionalMutationSet.sortConditionalMutations(updates);
 +
 +      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
 +
 +      // can not process two mutations for the same row, because one will not see what the other writes
 +      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
 +
 +      // get as many locks as possible w/o blocking... defer any rows that are locked
 +      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
 +      try {
 +        Span checkSpan = Trace.start("Check conditions");
 +        try {
 +          checkConditions(updates, results, cs, symbols);
 +        } finally {
 +          checkSpan.stop();
 +        }
 +
 +        Span updateSpan = Trace.start("apply conditional mutations");
 +        try {
 +          writeConditionalMutations(updates, results, cs);
 +        } finally {
 +          updateSpan.stop();
 +        }
 +      } finally {
 +        rowLocks.releaseRowLocks(locks);
 +      }
 +      return deferred;
 +    }
 +
 +    @Override
 +    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
 +        throws ThriftSecurityException, TException {
 +
 +      Authorizations userauths = null;
 +      if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +
 +      userauths = security.getUserAuthorizations(credentials);
 +      for (ByteBuffer auth : authorizations)
 +        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 +
 +      ConditionalSession cs = new ConditionalSession();
 +      cs.auths = new Authorizations(authorizations);
 +      cs.credentials = credentials;
 +      cs.tableId = tableID;
 +      cs.interruptFlag = new AtomicBoolean();
 +
 +      long sid = sessionManager.createSession(cs, false);
 +      return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
 +    }
 +
 +    @Override
 +    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
 +        throws NoSuchScanIDException, TException {
 +
 +      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
 +
 +      if (cs == null || cs.interruptFlag.get())
 +        throw new NoSuchScanIDException();
 +
 +      if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID))
 +        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
 +
 +      Text tid = new Text(cs.tableId);
 +      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
 +
 +      try {
 +        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
 +            new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
 +
 +        for (KeyExtent ke : updates.keySet())
 +          if (!ke.getTableId().equals(tid))
 +            throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
 +
 +        ArrayList<TCMResult> results = new ArrayList<TCMResult>();
 +
 +        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
 +
 +        while (deferred.size() > 0) {
 +          deferred = conditionalUpdate(cs, deferred, results, symbols);
 +        }
 +
 +        return results;
 +      } catch (IOException ioe) {
 +        throw new TException(ioe);
 +      } finally {
 +        writeTracker.finishWrite(opid);
 +        sessionManager.unreserveSession(sessID);
 +      }
 +    }
 +
 +    @Override
 +    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
 +      // this method should wait for any running conditional update to complete
 +      // after this method returns a conditional update should not be able to start
 +
 +      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
 +      if (cs != null)
 +        cs.interruptFlag.set(true);
 +
 +      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
 +      if (cs != null)
 +        sessionManager.removeSession(sessID, true);
 +    }
 +
 +    @Override
 +    public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
 +      sessionManager.removeSession(sessID, false);
 +    }
 +
 +    @Override
 +    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
 +        ThriftSecurityException {
 +
 +      String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
 +      if (!security.canSplitTablet(credentials, tableId))
 +        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +
 +      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
 +
 +      Tablet tablet = onlineTablets.get(keyExtent);
 +      if (tablet == null) {
 +        throw new NotServingTabletException(tkeyExtent);
 +      }
 +
 +      if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
 +        try {
 +          if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
 +            throw new NotServingTabletException(tkeyExtent);
 +          }
 +        } catch (IOException e) {
 +          log.warn("Failed to split " + keyExtent, e);
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
 +      return getStats(sessionManager.getActiveScansPerTable());
 +    }
 +
 +    @Override
 +    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
 +      TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
 +      synchronized (onlineTablets) {
 +        onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
 +      }
 +      List<TabletStats> result = new ArrayList<TabletStats>();
 +      Text text = new Text(tableId);
 +      KeyExtent start = new KeyExtent(text, new Text(), null);
 +      for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
 +        KeyExtent ke = entry.getKey();
 +        if (ke.getTableId().compareTo(text) == 0) {
 +          Tablet tablet = entry.getValue();
 +          TabletStats stats = tablet.timer.getTabletStats();
 +          stats.extent = ke.toThrift();
 +          stats.ingestRate = tablet.ingestRate();
 +          stats.queryRate = tablet.queryRate();
 +          stats.splitCreationTime = tablet.getSplitCreationTime();
 +          stats.numEntries = tablet.getNumEntries();
 +          result.add(stats);
 +        }
 +      }
 +      return result;
 +    }
 +
 +    private ZooCache masterLockCache = new ZooCache();
 +
 +    private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
 +      boolean fatal = false;
 +      try {
 +        log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
 +        if (!security.canPerformSystemActions(credentials)) {
 +          log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
 +          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 +        }
 +      } catch (ThriftSecurityException e) {
 +        log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
 +        if (SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName())) {
 +          log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
 +          fatal = true;
 +        }
 +        throw e;
 +      } finally {
 +        if (fatal) {
 +          Halt.halt(1, new Runnable() {
 +            @Override
 +            public void run() {
 +              logGCInfo(getSystemConfiguration());
 +            }
 +          });
 +        }
 +      }
 +
 +      if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
 +        log.warn("Got " + request + " message from master before lock acquired, ignoring...");
 +        throw new RuntimeException("Lock not acquired");
 +      }
 +
 +      if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
 +        Halt.halt(1, new Runnable() {
 +          @Override
 +          public void run() {
 +            log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
 +            logGCInfo(getSystemConfiguration());
 +          }
 +        });
 +      }
 +
 +      if (lock != null) {
 +        ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
 +
 +        try {
 +          if (!ZooLock.isLockHeld(masterLockCache, lid)) {
 +            // maybe the cache is out of date and a new master holds the
 +            // lock?
 +            masterLockCache.clear();
 +            if (!ZooLock.isLockHeld(masterLockCache, lid)) {
 +              log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
 +              throw new RuntimeException("bad master lock");
 +            }
 +          }
 +        } catch (Exception e) {
 +          throw new RuntimeException("bad master lock", e);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public void loadTablet(

<TRUNCATED>

[3/4] git commit: Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by vi...@apache.org.
Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3f92e012
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3f92e012
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3f92e012

Branch: refs/heads/master
Commit: 3f92e0129628ab80ac979c0258278b31dd40dae6
Parents: 550d8ac 16e095f
Author: John Vines <vi...@apache.org>
Authored: Tue Jan 28 18:15:42 2014 -0500
Committer: John Vines <vi...@apache.org>
Committed: Tue Jan 28 18:15:42 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/tserver/TabletServer.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------