You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/01/28 03:01:43 UTC

[4/5] accumulo git commit: Merge branch '1.6' into 1.7

Merge branch '1.6' into 1.7

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
	core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d4882a15
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d4882a15
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d4882a15

Branch: refs/heads/master
Commit: d4882a15fd85482855783e3a56babc14d31bb5bb
Parents: 0faf8b9 21d2f61
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jan 27 19:57:22 2016 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jan 27 19:57:22 2016 -0500

----------------------------------------------------------------------
 .../core/client/impl/CompressedIterators.java   |  14 +-
 .../core/client/impl/ConditionalWriterImpl.java |  44 ++++-
 .../accumulo/core/iterators/IteratorUtil.java   |  93 ++++++-----
 .../client/impl/ConditionalComparatorTest.java  |  53 ++++++
 .../tserver/ConditionCheckerContext.java        | 164 +++++++++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   |  76 ++-------
 .../accumulo/tserver/tablet/ScanDataSource.java |  23 ++-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  18 ++
 .../accumulo/test/ConditionalWriterIT.java      | 137 +++++++++++++++-
 9 files changed, 507 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 24040e6,9030d77..c7756ad
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@@ -17,11 -17,11 +17,13 @@@
  
  package org.apache.accumulo.core.client.impl;
  
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
+ import java.util.Arrays;
  import java.util.Collections;
+ import java.util.Comparator;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Iterator;
@@@ -69,14 -69,13 +71,15 @@@ import org.apache.accumulo.core.securit
  import org.apache.accumulo.core.security.VisibilityParseException;
  import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
 +import org.apache.accumulo.core.trace.thrift.TInfo;
  import org.apache.accumulo.core.util.BadArgumentException;
  import org.apache.accumulo.core.util.ByteBufferUtil;
 -import org.apache.accumulo.core.util.LoggingRunnable;
+ import org.apache.accumulo.core.util.NamingThreadFactory;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.util.LoggingRunnable;
  import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
  import org.apache.accumulo.fate.zookeeper.ZooLock;
  import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
@@@ -379,12 -373,13 +382,12 @@@ class ConditionalWriterImpl implements 
      }
    }
  
 -  ConditionalWriterImpl(Instance instance, Credentials credentials, String tableId, ConditionalWriterConfig config) {
 -    this.instance = instance;
 -    this.credentials = credentials;
 +  ConditionalWriterImpl(ClientContext context, String tableId, ConditionalWriterConfig config) {
 +    this.context = context;
      this.auths = config.getAuthorizations();
      this.ve = new VisibilityEvaluator(config.getAuthorizations());
-     this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+     this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName()));
 -    this.locator = TabletLocator.getLocator(instance, new Text(tableId));
 +    this.locator = TabletLocator.getLocator(context, new Text(tableId));
      this.serverQueues = new HashMap<String,ServerQueue>();
      this.tableId = tableId;
      this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 031d13f,6f76d77..c739e56
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@@ -124,22 -109,27 +125,27 @@@ public class IteratorUtil 
      return props;
    }
  
-   public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) {
-     List<IterInfo> iters = new ArrayList<IterInfo>();
-     parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf);
+   public static void mergeIteratorConfig(List<IterInfo> destList, Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters,
+       Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, Map<String,Map<String,String>> ssio) {
+     destList.addAll(tableIters);
+     destList.addAll(ssi);
+     Collections.sort(destList, new IterInfoComparator());
  
-     int max = 0;
- 
-     for (IterInfo iterInfo : iters) {
-       if (iterInfo.priority > max)
-         max = iterInfo.priority;
+     Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet();
+     for (Entry<String,Map<String,String>> entry : es) {
+       if (entry.getValue() == null) {
+         destOpts.put(entry.getKey(), null);
+       } else {
 -        destOpts.put(entry.getKey(), new HashMap<String,String>(entry.getValue()));
++        destOpts.put(entry.getKey(), new HashMap<>(entry.getValue()));
+       }
      }
  
-     return max;
+     IteratorUtil.mergeOptions(ssio, destOpts);
+ 
    }
  
-   protected static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
+   public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
 -    final Property scopeProperty = IteratorScope.getProperty(scope);
 +    final Property scopeProperty = getProperty(scope);
      final String scopePropertyKey = scopeProperty.getKey();
  
      for (Entry<String,String> entry : conf.getAllPropertiesWithPrefix(scopeProperty).entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
index 0000000,2e34f38..39aa684
mode 000000,100644..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
@@@ -1,0 -1,164 +1,164 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.accumulo.tserver;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ 
+ import org.apache.accumulo.core.client.impl.CompressedIterators;
+ import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.ArrayByteSequence;
+ import org.apache.accumulo.core.data.ByteSequence;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.data.thrift.IterInfo;
+ import org.apache.accumulo.core.data.thrift.TCMResult;
+ import org.apache.accumulo.core.data.thrift.TCMStatus;
+ import org.apache.accumulo.core.data.thrift.TCondition;
+ import org.apache.accumulo.core.iterators.IteratorUtil;
+ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+ import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+ import org.apache.hadoop.io.Text;
+ 
+ import com.google.common.base.Preconditions;
+ 
+ public class ConditionCheckerContext {
+   private CompressedIterators compressedIters;
+ 
+   private List<IterInfo> tableIters;
+   private Map<String,Map<String,String>> tableIterOpts;
+   private TabletIteratorEnvironment tie;
+   private String context;
+   private Map<String,Class<? extends SortedKeyValueIterator<Key,Value>>> classCache;
+ 
+   private static class MergedIterConfig {
+     List<IterInfo> mergedIters;
+     Map<String,Map<String,String>> mergedItersOpts;
+ 
+     MergedIterConfig(List<IterInfo> mergedIters, Map<String,Map<String,String>> mergedItersOpts) {
+       this.mergedIters = mergedIters;
+       this.mergedItersOpts = mergedItersOpts;
+     }
+   }
+ 
 -  private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<ByteSequence,MergedIterConfig>();
++  private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<>();
+ 
+   ConditionCheckerContext(CompressedIterators compressedIters, AccumuloConfiguration tableConf) {
+     this.compressedIters = compressedIters;
+ 
 -    tableIters = new ArrayList<IterInfo>();
 -    tableIterOpts = new HashMap<String,Map<String,String>>();
++    tableIters = new ArrayList<>();
++    tableIterOpts = new HashMap<>();
+ 
+     // parse table iterator config once
+     IteratorUtil.parseIterConf(IteratorScope.scan, tableIters, tableIterOpts, tableConf);
+ 
+     context = tableConf.get(Property.TABLE_CLASSPATH);
+ 
 -    classCache = new HashMap<String,Class<? extends SortedKeyValueIterator<Key,Value>>>();
++    classCache = new HashMap<>();
+ 
+     tie = new TabletIteratorEnvironment(IteratorScope.scan, tableConf);
+   }
+ 
+   SortedKeyValueIterator<Key,Value> buildIterator(SortedKeyValueIterator<Key,Value> systemIter, TCondition tc) throws IOException {
+ 
+     ArrayByteSequence key = new ArrayByteSequence(tc.iterators);
+     MergedIterConfig mic = mergedIterCache.get(key);
+     if (mic == null) {
+       IterConfig ic = compressedIters.decompress(tc.iterators);
+ 
 -      List<IterInfo> mergedIters = new ArrayList<IterInfo>(tableIters.size() + ic.ssiList.size());
 -      Map<String,Map<String,String>> mergedItersOpts = new HashMap<String,Map<String,String>>(tableIterOpts.size() + ic.ssio.size());
++      List<IterInfo> mergedIters = new ArrayList<>(tableIters.size() + ic.ssiList.size());
++      Map<String,Map<String,String>> mergedItersOpts = new HashMap<>(tableIterOpts.size() + ic.ssio.size());
+ 
+       IteratorUtil.mergeIteratorConfig(mergedIters, mergedItersOpts, tableIters, tableIterOpts, ic.ssiList, ic.ssio);
+ 
+       mic = new MergedIterConfig(mergedIters, mergedItersOpts);
+ 
+       mergedIterCache.put(key, mic);
+     }
+ 
+     return IteratorUtil.loadIterators(systemIter, mic.mergedIters, mic.mergedItersOpts, tie, true, context, classCache);
+   }
+ 
+   boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter, ServerConditionalMutation scm) throws IOException {
+     boolean add = true;
+ 
+     for (TCondition tc : scm.getConditions()) {
+ 
+       Range range;
+       if (tc.hasTimestamp)
+         range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+       else
+         range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+ 
+       SortedKeyValueIterator<Key,Value> iter = buildIterator(systemIter, tc);
+ 
+       ByteSequence cf = new ArrayByteSequence(tc.getCf());
+       iter.seek(range, Collections.singleton(cf), true);
+       Value val = null;
+       if (iter.hasTop()) {
+         val = iter.getTopValue();
+       }
+ 
+       if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+         add = false;
+         break;
+       }
+     }
+     return add;
+   }
+ 
+   public class ConditionChecker {
+ 
+     private List<ServerConditionalMutation> conditionsToCheck;
+     private List<ServerConditionalMutation> okMutations;
+     private List<TCMResult> results;
+     private boolean checked = false;
+ 
+     public ConditionChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) {
+       this.conditionsToCheck = conditionsToCheck;
+       this.okMutations = okMutations;
+       this.results = results;
+     }
+ 
+     public void check(SortedKeyValueIterator<Key,Value> systemIter) throws IOException {
+       Preconditions.checkArgument(!checked, "check() method should only be called once");
+       checked = true;
+ 
+       for (ServerConditionalMutation scm : conditionsToCheck) {
+         if (checkConditions(systemIter, scm)) {
+           okMutations.add(scm);
+         } else {
+           results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+         }
+       }
+     }
+   }
+ 
+   public ConditionChecker newChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) {
+     return new ConditionChecker(conditionsToCheck, okMutations, results);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1e0d119,6023ae3..038d3e8
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -57,11 -65,8 +57,10 @@@ import java.util.concurrent.locks.Reent
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Durability;
  import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.impl.CompressedIterators;
- import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
 +import org.apache.accumulo.core.client.impl.DurabilityImpl;
  import org.apache.accumulo.core.client.impl.ScannerImpl;
  import org.apache.accumulo.core.client.impl.Tables;
  import org.apache.accumulo.core.client.impl.TabletLocator;
@@@ -91,9 -98,9 +90,8 @@@ import org.apache.accumulo.core.data.th
  import org.apache.accumulo.core.data.thrift.TCMResult;
  import org.apache.accumulo.core.data.thrift.TCMStatus;
  import org.apache.accumulo.core.data.thrift.TColumn;
- import org.apache.accumulo.core.data.thrift.TCondition;
  import org.apache.accumulo.core.data.thrift.TConditionalMutation;
  import org.apache.accumulo.core.data.thrift.TConditionalSession;
 -import org.apache.accumulo.core.data.thrift.TKey;
  import org.apache.accumulo.core.data.thrift.TKeyExtent;
  import org.apache.accumulo.core.data.thrift.TKeyValue;
  import org.apache.accumulo.core.data.thrift.TMutation;
@@@ -197,7 -193,21 +195,8 @@@ import org.apache.accumulo.server.zooke
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
  import org.apache.accumulo.start.classloader.vfs.ContextManager;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.thrift.TInfo;
 -import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+ import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
  import org.apache.accumulo.tserver.RowLocks.RowLock;
 -import org.apache.accumulo.tserver.Tablet.CommitSession;
 -import org.apache.accumulo.tserver.Tablet.KVEntry;
 -import org.apache.accumulo.tserver.Tablet.LookupResult;
 -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
 -import org.apache.accumulo.tserver.Tablet.ScanBatch;
 -import org.apache.accumulo.tserver.Tablet.Scanner;
 -import org.apache.accumulo.tserver.Tablet.SplitInfo;
 -import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
 -import org.apache.accumulo.tserver.Tablet.TabletClosedException;
  import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
  import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
  import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@@ -209,30 -219,10 +208,28 @@@ import org.apache.accumulo.tserver.log.
  import org.apache.accumulo.tserver.mastermessage.MasterMessage;
  import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
  import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
 -import org.apache.accumulo.tserver.metrics.TabletServerMBean;
 -import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
 +import org.apache.accumulo.tserver.metrics.TabletServerMetricsFactory;
  import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
  import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
 +import org.apache.accumulo.tserver.replication.ReplicationWorker;
 +import org.apache.accumulo.tserver.scan.LookupTask;
 +import org.apache.accumulo.tserver.scan.NextBatchTask;
 +import org.apache.accumulo.tserver.scan.ScanRunState;
 +import org.apache.accumulo.tserver.session.ConditionalSession;
 +import org.apache.accumulo.tserver.session.MultiScanSession;
 +import org.apache.accumulo.tserver.session.ScanSession;
 +import org.apache.accumulo.tserver.session.Session;
 +import org.apache.accumulo.tserver.session.SessionManager;
 +import org.apache.accumulo.tserver.session.UpdateSession;
 +import org.apache.accumulo.tserver.tablet.CommitSession;
 +import org.apache.accumulo.tserver.tablet.CompactionInfo;
 +import org.apache.accumulo.tserver.tablet.CompactionWatcher;
 +import org.apache.accumulo.tserver.tablet.Compactor;
- import org.apache.accumulo.tserver.tablet.KVEntry;
 +import org.apache.accumulo.tserver.tablet.ScanBatch;
- import org.apache.accumulo.tserver.tablet.Scanner;
 +import org.apache.accumulo.tserver.tablet.SplitInfo;
 +import org.apache.accumulo.tserver.tablet.Tablet;
 +import org.apache.accumulo.tserver.tablet.TabletClosedException;
  import org.apache.commons.collections.map.LRUMap;
  import org.apache.hadoop.fs.FSError;
  import org.apache.hadoop.fs.FileSystem;
@@@ -256,151 -256,781 +253,150 @@@ public class TabletServer extends Accum
    private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
    private static final long TIME_BETWEEN_GC_CHECKS = 5000;
    private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
-   private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
  
 -  private TabletServerLogger logger;
 -
 -  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
 +  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
 +  private final TransactionWatcher watcher = new TransactionWatcher();
 +  private final ZooCache masterLockCache = new ZooCache();
  
 -  private ServerConfiguration serverConfig;
 -  private LogSorter logSorter = null;
 +  private final TabletServerLogger logger;
  
 -  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
 -    super();
 -    this.serverConfig = conf;
 -    this.instance = conf.getInstance();
 -    this.fs = fs;
 +  private final TabletServerMetricsFactory metricsFactory;
 +  private final Metrics updateMetrics;
 +  private final Metrics scanMetrics;
 +  private final Metrics mincMetrics;
  
 -    log.info("Version " + Constants.VERSION);
 -    log.info("Instance " + instance.getInstanceID());
 -
 -    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        synchronized (onlineTablets) {
 -          long now = System.currentTimeMillis();
 -          for (Tablet tablet : onlineTablets.values())
 -            try {
 -              tablet.updateRates(now);
 -            } catch (Exception ex) {
 -              log.error(ex, ex);
 -            }
 -        }
 -      }
 -    }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        TabletLocator.clearLocators();
 -      }
 -    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
 +  public Metrics getMinCMetrics() {
 +    return mincMetrics;
    }
  
 -  private static long jitter(long ms) {
 -    Random r = new Random();
 -    // add a random 10% wait
 -    return (long) ((1. + (r.nextDouble() / 10)) * ms);
 -  }
 -
 -  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 -    long now = System.currentTimeMillis();
 -
 -    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 -    Runtime rt = Runtime.getRuntime();
 -
 -    StringBuilder sb = new StringBuilder("gc");
 -
 -    boolean sawChange = false;
 -
 -    long maxIncreaseInCollectionTime = 0;
 -
 -    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 -      Long prevTime = prevGcTime.get(gcBean.getName());
 -      long pt = 0;
 -      if (prevTime != null) {
 -        pt = prevTime;
 -      }
 -
 -      long time = gcBean.getCollectionTime();
 -
 -      if (time - pt != 0) {
 -        sawChange = true;
 -      }
 -
 -      long increaseInCollectionTime = time - pt;
 -      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
 -      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
 -      prevGcTime.put(gcBean.getName(), time);
 -    }
 -
 -    long mem = rt.freeMemory();
 -    if (maxIncreaseInCollectionTime == 0) {
 -      gcTimeIncreasedCount = 0;
 -    } else {
 -      gcTimeIncreasedCount++;
 -      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 -        log.warn("Running low on memory");
 -        gcTimeIncreasedCount = 0;
 -      }
 -    }
 -
 -    if (mem > lastMemorySize) {
 -      sawChange = true;
 -    }
 -
 -    String sign = "+";
 -    if (mem - lastMemorySize <= 0) {
 -      sign = "";
 -    }
 -
 -    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
 -
 -    if (sawChange) {
 -      log.debug(sb.toString());
 -    }
 -
 -    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 -    if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
 -      long diff = now - lastMemoryCheckTime;
 -      if (diff > keepAliveTimeout + 1000) {
 -        log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check",
 -            TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
 -      }
 -      lastMemoryCheckTime = now;
 -      return;
 -    }
 -
 -    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 -      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
 -    }
 -
 -    lastMemorySize = mem;
 -    lastMemoryCheckTime = now;
 -  }
 -
 -  private TabletStatsKeeper statsKeeper;
 -
 -  private static class Session {
 -    long lastAccessTime;
 -    long startTime;
 -    String user;
 -    String client = TServerUtils.clientAddress.get();
 -    public boolean reserved;
 -
 -    public boolean cleanup() {
 -      return true;
 -    }
 -  }
 -
 -  private static class SessionManager {
 -
 -    SecureRandom random;
 -    Map<Long,Session> sessions;
 -    private long maxIdle;
 -    private long maxUpdateIdle;
 -    private List<Session> idleSessions = new ArrayList<Session>();
 -    private final Long expiredSessionMarker = new Long(-1);
 -
 -    SessionManager(AccumuloConfiguration conf) {
 -      random = new SecureRandom();
 -      sessions = new HashMap<Long,Session>();
 -      maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
 -      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 -
 -      Runnable r = new Runnable() {
 -        @Override
 -        public void run() {
 -          sweep(maxIdle, maxUpdateIdle);
 -        }
 -      };
 -
 -      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 -    }
 -
 -    synchronized long createSession(Session session, boolean reserve) {
 -      long sid = random.nextLong();
 -
 -      while (sessions.containsKey(sid)) {
 -        sid = random.nextLong();
 -      }
 -
 -      sessions.put(sid, session);
 -
 -      session.reserved = reserve;
 -
 -      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 -
 -      return sid;
 -    }
 -
 -    long getMaxIdleTime() {
 -      return maxIdle;
 -    }
 -
 -    /**
 -     * while a session is reserved, it cannot be canceled or removed
 -     */
 -    synchronized Session reserveSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized Session reserveSession(long sessionId, boolean wait) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        while (wait && session.reserved) {
 -          try {
 -            wait(1000);
 -          } catch (InterruptedException e) {
 -            throw new RuntimeException();
 -          }
 -        }
 -
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized void unreserveSession(Session session) {
 -      if (!session.reserved)
 -        throw new IllegalStateException();
 -      notifyAll();
 -      session.reserved = false;
 -      session.lastAccessTime = System.currentTimeMillis();
 -    }
 -
 -    synchronized void unreserveSession(long sessionId) {
 -      Session session = getSession(sessionId);
 -      if (session != null)
 -        unreserveSession(session);
 -    }
 -
 -    synchronized Session getSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null)
 -        session.lastAccessTime = System.currentTimeMillis();
 -      return session;
 -    }
 -
 -    Session removeSession(long sessionId) {
 -      return removeSession(sessionId, false);
 -    }
 -
 -    Session removeSession(long sessionId, boolean unreserve) {
 -      Session session = null;
 -      synchronized (this) {
 -        session = sessions.remove(sessionId);
 -        if (unreserve && session != null)
 -          unreserveSession(session);
 -      }
 -
 -      // do clean up out side of lock..
 -      if (session != null)
 -        session.cleanup();
 -
 -      return session;
 -    }
 -
 -    private void sweep(final long maxIdle, final long maxUpdateIdle) {
 -      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 -      synchronized (this) {
 -        Iterator<Session> iter = sessions.values().iterator();
 -        while (iter.hasNext()) {
 -          Session session = iter.next();
 -          long configuredIdle = maxIdle;
 -          if (session instanceof UpdateSession) {
 -            configuredIdle = maxUpdateIdle;
 -          }
 -          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 -          if (idleTime > configuredIdle && !session.reserved) {
 -            log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms");
 -            iter.remove();
 -            sessionsToCleanup.add(session);
 -          }
 -        }
 -      }
 -
 -      // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list
 -
 -      synchronized (idleSessions) {
 -
 -        sessionsToCleanup.addAll(idleSessions);
 -
 -        idleSessions.clear();
 -
 -        // perform cleanup for all of the sessions
 -        for (Session session : sessionsToCleanup) {
 -          if (!session.cleanup())
 -            idleSessions.add(session);
 -        }
 -      }
 -
 -    }
 -
 -    synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        final long removeTime = session.lastAccessTime;
 -        TimerTask r = new TimerTask() {
 -          @Override
 -          public void run() {
 -            Session sessionToCleanup = null;
 -            synchronized (SessionManager.this) {
 -              Session session2 = sessions.get(sessionId);
 -              if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
 -                log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms");
 -                sessions.remove(sessionId);
 -                sessionToCleanup = session2;
 -              }
 -            }
 -
 -            // call clean up outside of lock
 -            if (sessionToCleanup != null)
 -              sessionToCleanup.cleanup();
 -          }
 -        };
 -
 -        SimpleTimer.getInstance().schedule(r, delay);
 -      }
 -    }
 -
 -    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
 -      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
 -      Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
 -
 -      synchronized (idleSessions) {
 -        /**
 -         * Add sessions so that get the list returned in the active scans call
 -         */
 -        for (Session session : idleSessions) {
 -          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
 -        }
 -      }
 -
 -      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
 -
 -        Session session = entry.getValue();
 -        @SuppressWarnings("rawtypes")
 -        ScanTask nbt = null;
 -        String tableID = null;
 -
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 -          nbt = ss.nextBatchTask;
 -          tableID = ss.extent.getTableId().toString();
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 -          nbt = mss.lookupTask;
 -          tableID = mss.threadPoolExtent.getTableId().toString();
 -        }
 -
 -        if (nbt == null)
 -          continue;
 +  private final LogSorter logSorter;
 +  private ReplicationWorker replWorker = null;
 +  private final TabletStatsKeeper statsKeeper;
 +  private final AtomicInteger logIdGenerator = new AtomicInteger();
  
 -        ScanRunState srs = nbt.getScanRunState();
 +  private final AtomicLong flushCounter = new AtomicLong(0);
 +  private final AtomicLong syncCounter = new AtomicLong(0);
  
 -        if (srs == ScanRunState.FINISHED)
 -          continue;
 +  private final VolumeManager fs;
  
 -        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 -        if (stateCounts == null) {
 -          stateCounts = new MapCounter<ScanRunState>();
 -          counts.put(tableID, stateCounts);
 -        }
 +  private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
 +  private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  @SuppressWarnings("unchecked")
 +  private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
  
 -        stateCounts.increment(srs, 1);
 -      }
 +  private final TabletServerResourceManager resourceManager;
 +  private final SecurityOperation security;
  
 -      return counts;
 -    }
 +  private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
  
 -    public synchronized List<ActiveScan> getActiveScans() {
 +  private Thread majorCompactorThread;
  
 -      final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 -      final long ct = System.currentTimeMillis();
 -      final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
 +  private HostAndPort replicationAddress;
 +  private HostAndPort clientAddress;
  
 -      synchronized (idleSessions) {
 -        /**
 -         * Add sessions so that get the list returned in the active scans call
 -         */
 -        for (Session session : idleSessions) {
 -          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
 -        }
 -      }
 +  private volatile boolean serverStopRequested = false;
 +  private volatile boolean majorCompactorDisabled = false;
 +  private volatile boolean shutdownComplete = false;
  
 -      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
 -        Session session = entry.getValue();
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 +  private ZooLock tabletServerLock;
  
 -          ScanState state = ScanState.RUNNING;
 +  private TServer server;
 +  private TServer replServer;
  
 -          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 -            }
 -          }
 +  private DistributedWorkQueue bulkFailedCopyQ;
  
 -          ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
 -              ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
 -              ss.auths.getAuthorizationsBB());
 +  private String lockID;
  
 -          // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
 -          activeScan.setScanId(entry.getKey());
 -          activeScans.add(activeScan);
 +  public static final AtomicLong seekCount = new AtomicLong(0);
  
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 +  private final AtomicLong totalMinorCompactions = new AtomicLong(0);
 +  private final ServerConfigurationFactory confFactory;
  
 -          ScanState state = ScanState.RUNNING;
 +  private final ZooAuthenticationKeyWatcher authKeyWatcher;
  
 -          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 +  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
 +    super(confFactory);
 +    this.confFactory = confFactory;
 +    this.fs = fs;
 +    AccumuloConfiguration aconf = getConfiguration();
 +    Instance instance = getInstance();
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    this.sessionManager = new SessionManager(aconf);
 +    this.logSorter = new LogSorter(instance, fs, aconf);
 +    this.replWorker = new ReplicationWorker(this, fs);
 +    this.statsKeeper = new TabletStatsKeeper();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error("Error updating rates for {}", tablet.getExtent(), ex);
              }
 -          }
 -
 -          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
 -              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
 -                  .getAuthorizationsBB()));
          }
        }
 +    }, 5000, 5000);
  
 -      return activeScans;
 -    }
 -  }
 -
 -  static class TservConstraintEnv implements Environment {
 -
 -    private TCredentials credentials;
 -    private SecurityOperation security;
 -    private Authorizations auths;
 -    private KeyExtent ke;
 -
 -    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 -      this.security = secOp;
 -      this.credentials = credentials;
 -    }
 -
 -    void setExtent(KeyExtent ke) {
 -      this.ke = ke;
 -    }
 -
 -    @Override
 -    public KeyExtent getExtent() {
 -      return ke;
 -    }
 -
 -    @Override
 -    public String getUser() {
 -      return credentials.getPrincipal();
 -    }
 -
 -    @Override
 -    @Deprecated
 -    public Authorizations getAuthorizations() {
 -      if (auths == null)
 -        try {
 -          this.auths = security.getUserAuthorizations(credentials);
 -        } catch (ThriftSecurityException e) {
 -          throw new RuntimeException(e);
 -        }
 -      return auths;
 -    }
 -
 -    @Override
 -    public AuthorizationContainer getAuthorizationsContainer() {
 -      return new AuthorizationContainer() {
 -        @Override
 -        public boolean contains(ByteSequence auth) {
 -          try {
 -            return security.userHasAuthorizations(credentials,
 -                Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length())));
 -          } catch (ThriftSecurityException e) {
 -            throw new RuntimeException(e);
 -          }
 -        }
 -      };
 -    }
 -  }
 -
 -  private abstract class ScanTask<T> implements RunnableFuture<T> {
 -
 -    protected AtomicBoolean interruptFlag;
 -    protected ArrayBlockingQueue<Object> resultQueue;
 -    protected AtomicInteger state;
 -    protected AtomicReference<ScanRunState> runState;
 -
 -    private static final int INITIAL = 1;
 -    private static final int ADDED = 2;
 -    private static final int CANCELED = 3;
 -
 -    ScanTask() {
 -      interruptFlag = new AtomicBoolean(false);
 -      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 -      state = new AtomicInteger(INITIAL);
 -      resultQueue = new ArrayBlockingQueue<Object>(1);
 -    }
 -
 -    protected void addResult(Object o) {
 -      if (state.compareAndSet(INITIAL, ADDED))
 -        resultQueue.add(o);
 -      else if (state.get() == ADDED)
 -        throw new IllegalStateException("Tried to add more than one result");
 -    }
 -
 -    @Override
 -    public boolean cancel(boolean mayInterruptIfRunning) {
 -      if (!mayInterruptIfRunning)
 -        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
 -
 -      if (state.get() == CANCELED)
 -        return true;
 -
 -      if (state.compareAndSet(INITIAL, CANCELED)) {
 -        interruptFlag.set(true);
 -        resultQueue = null;
 -        return true;
 -      }
 -
 -      return false;
 -    }
 -
 -    @Override
 -    public T get() throws InterruptedException, ExecutionException {
 -      throw new UnsupportedOperationException();
 -    }
 -
 -    @SuppressWarnings("unchecked")
 -    @Override
 -    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 -
 -      ArrayBlockingQueue<Object> localRQ = resultQueue;
 -
 -      if (state.get() == CANCELED)
 -        throw new CancellationException();
 -
 -      if (localRQ == null) {
 -        int st = state.get();
 -        String stateStr;
 -        switch (st) {
 -          case ADDED:
 -            stateStr = "ADDED";
 -            break;
 -          case CANCELED:
 -            stateStr = "CANCELED";
 -            break;
 -          case INITIAL:
 -            stateStr = "INITIAL";
 -            break;
 -          default:
 -            stateStr = "UNKNOWN";
 -            break;
 -        }
 -        throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]");
 -      }
 -
 -      Object r = localRQ.poll(timeout, unit);
 -
 -      // could have been canceled while waiting
 -      if (state.get() == CANCELED) {
 -        if (r != null)
 -          throw new IllegalStateException("Nothing should have been added when in canceled state");
 +    final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
 +    final long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0);
 +    if (minBlockSize != 0 && minBlockSize > walogMaxSize)
 +      throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
 +          + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
  
 -        throw new CancellationException();
 +    final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
 +    final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
 +    final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
 +    // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure,
 +    // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries.
 +    final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
 +        walCreationFailureRetryIncrement, walCreationFailureRetryMax);
 +
 +    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory);
 +    this.resourceManager = new TabletServerResourceManager(this, fs);
 +    this.security = AuditedSecurityOperation.getInstance(this);
 +
 +    metricsFactory = new TabletServerMetricsFactory(aconf);
 +    updateMetrics = metricsFactory.createUpdateMetrics();
 +    scanMetrics = metricsFactory.createScanMetrics();
 +    mincMetrics = metricsFactory.createMincMetrics();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        TabletLocator.clearLocators();
        }
 +    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
  
 -      if (r == null)
 -        throw new TimeoutException();
 -
 -      // make this method stop working now that something is being
 -      // returned
 -      resultQueue = null;
 -
 -      if (r instanceof Throwable)
 -        throw new ExecutionException((Throwable) r);
 -
 -      return (T) r;
 -    }
 -
 -    @Override
 -    public boolean isCancelled() {
 -      return state.get() == CANCELED;
 -    }
 -
 -    @Override
 -    public boolean isDone() {
 -      return runState.get().equals(ScanRunState.FINISHED);
 -    }
 -
 -    public ScanRunState getScanRunState() {
 -      return runState.get();
 -    }
 -
 -  }
 -
 -  private static class ConditionalSession extends Session {
 -    public TCredentials credentials;
 -    public Authorizations auths;
 -    public String tableId;
 -    public AtomicBoolean interruptFlag;
 -
 -    @Override
 -    public boolean cleanup() {
 -      interruptFlag.set(true);
 -      return true;
 -    }
 -  }
 -
 -  private static class UpdateSession extends Session {
 -    public Tablet currentTablet;
 -    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
 -    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
 -    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
 -    public Violations violations;
 -    public TCredentials credentials;
 -    public long totalUpdates = 0;
 -    public long flushTime = 0;
 -    Stat prepareTimes = new Stat();
 -    Stat walogTimes = new Stat();
 -    Stat commitTimes = new Stat();
 -    Stat authTimes = new Stat();
 -    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
 -    public long queuedMutationSize = 0;
 -    TservConstraintEnv cenv = null;
 -  }
 -
 -  private static class ScanSession extends Session {
 -    public KeyExtent extent;
 -    public HashSet<Column> columnSet;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -    public long entriesReturned = 0;
 -    public Stat nbTimes = new Stat();
 -    public long batchCount = 0;
 -    public volatile ScanTask<ScanBatch> nextBatchTask;
 -    public AtomicBoolean interruptFlag;
 -    public Scanner scanner;
 -    public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 -
 -    @Override
 -    public boolean cleanup() {
 -      final boolean ret;
 -      try {
 -        if (nextBatchTask != null)
 -          nextBatchTask.cancel(true);
 -      } finally {
 -        if (scanner != null)
 -          ret = scanner.close();
 -        else
 -          ret = true;
 -      }
 -      return ret;
 +    // Create the secret manager
 +    setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
 +    if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
 +      log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys");
 +      // Watcher to notice new AuthenticationKeys which enable delegation tokens
 +      authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance)
 +          + Constants.ZDELEGATION_TOKEN_KEYS);
 +    } else {
 +      authKeyWatcher = null;
      }
 -
    }
  
 -  private static class MultiScanSession extends Session {
 -    HashSet<Column> columnSet;
 -    Map<KeyExtent,List<Range>> queries;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -
 -    // stats
 -    int numRanges;
 -    int numTablets;
 -    int numEntries;
 -    long totalLookupTime;
 -
 -    public volatile ScanTask<MultiScanResult> lookupTask;
 -    public KeyExtent threadPoolExtent;
 -
 -    @Override
 -    public boolean cleanup() {
 -      if (lookupTask != null)
 -        lookupTask.cancel(true);
 -      // the cancellation should provide us the safety to return true here
 -      return true;
 -    }
 +  private static long jitter(long ms) {
 +    Random r = new Random();
 +    // add a random 10% wait
 +    return (long) ((1. + (r.nextDouble() / 10)) * ms);
    }
  
 -  /**
 -   * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
 -   * are monotonically increasing.
 -   *
 -   */
 -  static class WriteTracker {
 -    private static AtomicLong operationCounter = new AtomicLong(1);
 -    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
 -
 -    WriteTracker() {
 -      for (TabletType ttype : TabletType.values()) {
 -        inProgressWrites.put(ttype, new TreeSet<Long>());
 -      }
 -    }
 -
 -    synchronized long startWrite(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      inProgressWrites.get(ttype).add(operationId);
 -      return operationId;
 -    }
 -
 -    synchronized void finishWrite(long operationId) {
 -      if (operationId == -1)
 -        return;
 +  private final SessionManager sessionManager;
  
 -      boolean removed = false;
 +  private final WriteTracker writeTracker = new WriteTracker();
  
 -      for (TabletType ttype : TabletType.values()) {
 -        removed = inProgressWrites.get(ttype).remove(operationId);
 -        if (removed)
 -          break;
 -      }
 +  private final RowLocks rowLocks = new RowLocks();
  
 -      if (!removed) {
 -        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId " + operationId);
 -      }
 -
 -      this.notifyAll();
 -    }
 -
 -    synchronized void waitForWrites(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      while (inProgressWrites.get(ttype).floor(operationId) != null) {
 -        try {
 -          this.wait();
 -        } catch (InterruptedException e) {
 -          log.error(e, e);
 -        }
 -      }
 -    }
 -
 -    public long startWrite(Set<Tablet> keySet) {
 -      if (keySet.size() == 0)
 -        return -1;
 -
 -      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
 -
 -      for (Tablet tablet : keySet)
 -        extents.add(tablet.getExtent());
 -
 -      return startWrite(TabletType.type(extents));
 -    }
 -  }
 -
 -  public AccumuloConfiguration getSystemConfiguration() {
 -    return serverConfig.getConfiguration();
 -  }
 -
 -  TransactionWatcher watcher = new TransactionWatcher();
 +  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
 +  private final ReentrantLock recoveryLock = new ReentrantLock(true);
  
    private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
  
@@@ -1071,6 -1930,7 +1067,7 @@@
        Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
  
        final CompressedIterators compressedIters = new CompressedIterators(symbols);
 -      ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, ServerConfiguration.getTableConfiguration(instance, cs.tableId));
++      ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, confFactory.getTableConfiguration(cs.tableId));
  
        while (iter.hasNext()) {
          final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
@@@ -1082,65 -1942,44 +1079,28 @@@
            iter.remove();
          } else {
            final List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+           final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
  
-           for (ServerConditionalMutation scm : entry.getValue()) {
-             if (checkCondition(results, cs, compressedIters, tablet, scm))
-               okMutations.add(scm);
-           }
- 
-           entry.setValue(okMutations);
-         }
- 
-       }
-     }
- 
-     private boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
-         ServerConditionalMutation scm) throws IOException {
-       boolean add = true;
- 
-       for (TCondition tc : scm.getConditions()) {
- 
-         Range range;
-         if (tc.hasTimestamp)
-           range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
-         else
-           range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
- 
-         IterConfig ic = compressedIters.decompress(tc.iterators);
- 
-         Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
- 
-         try {
-           ScanBatch batch = scanner.read();
- 
-           Value val = null;
+           ConditionChecker checker = checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
+           try {
+             tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
  
-           for (KVEntry entry2 : batch.getResults()) {
-             val = entry2.getValue();
-             break;
-           }
+             if (okMutations.size() > 0) {
+               entry.setValue(okMutations);
+             } else {
+               iter.remove();
+             }
 -          } catch (TabletClosedException e) {
 -            // clear anything added while checking conditions.
 -            resultsSubList.clear();
 -
 -            for (ServerConditionalMutation scm : entry.getValue()) {
 -              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 -            }
 -            iter.remove();
 -          } catch (IterationInterruptedException e) {
 -            // clear anything added while checking conditions.
 -            resultsSubList.clear();
 -
 -            for (ServerConditionalMutation scm : entry.getValue()) {
 -              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
 -            }
 -            iter.remove();
 -          } catch (TooManyFilesException e) {
++          } catch (TabletClosedException | IterationInterruptedException | TooManyFilesException e) {
+             // clear anything added while checking conditions.
+             resultsSubList.clear();
  
-           if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
-             results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
-             add = false;
-             break;
+             for (ServerConditionalMutation scm : entry.getValue()) {
+               results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+             }
+             iter.remove();
            }
- 
-         } catch (TabletClosedException e) {
-           results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-           add = false;
-           break;
-         } catch (IterationInterruptedException iie) {
-           results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-           add = false;
-           break;
-         } catch (TooManyFilesException tmfe) {
-           results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-           add = false;
-           break;
          }
        }
-       return add;
      }
  
      private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 33277bd,0000000..f586e2e
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@@ -1,228 -1,0 +1,247 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
++import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
++import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.iterators.IterationInterruptedException;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
 +import org.apache.accumulo.core.iterators.system.DeletingIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +import org.apache.accumulo.core.iterators.system.MultiIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 +import org.apache.accumulo.core.iterators.system.StatsIterator;
 +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
 +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
 +import org.apache.accumulo.tserver.TabletIteratorEnvironment;
 +import org.apache.accumulo.tserver.TabletServer;
 +
 +class ScanDataSource implements DataSource {
 +
 +  // data source state
 +  private final Tablet tablet;
 +  private ScanFileManager fileManager;
 +  private SortedKeyValueIterator<Key,Value> iter;
 +  private long expectedDeletionCount;
 +  private List<MemoryIterator> memIters = null;
 +  private long fileReservationId;
 +  private AtomicBoolean interruptFlag;
 +  private StatsIterator statsIterator;
 +
 +  private final ScanOptions options;
++  private final boolean loadIters;
++
++  private static final Set<Column> EMPTY_COLS = Collections.emptySet();
 +
 +  ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
 +      Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) {
 +    this.tablet = tablet;
 +    expectedDeletionCount = tablet.getDataSourceDeletions();
 +    this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
 +    this.interruptFlag = interruptFlag;
++    this.loadIters = true;
 +  }
 +
 +  ScanDataSource(Tablet tablet, ScanOptions options) {
 +    this.tablet = tablet;
 +    expectedDeletionCount = tablet.getDataSourceDeletions();
 +    this.options = options;
 +    this.interruptFlag = options.getInterruptFlag();
++    this.loadIters = true;
++  }
++
++  ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, AtomicBoolean iFlag) {
++    this.tablet = tablet;
++    expectedDeletionCount = tablet.getDataSourceDeletions();
++    this.options = new ScanOptions(-1, authorizations, defaultLabels, EMPTY_COLS, null, null, iFlag, false);
++    this.interruptFlag = iFlag;
++    this.loadIters = false;
 +  }
 +
 +  @Override
 +  public DataSource getNewDataSource() {
 +    if (!isCurrent()) {
 +      // log.debug("Switching data sources during a scan");
 +      if (memIters != null) {
 +        tablet.getTabletMemory().returnIterators(memIters);
 +        memIters = null;
 +        tablet.getDatafileManager().returnFilesForScan(fileReservationId);
 +        fileReservationId = -1;
 +      }
 +
 +      if (fileManager != null)
 +        fileManager.releaseOpenFiles(false);
 +
 +      expectedDeletionCount = tablet.getDataSourceDeletions();
 +      iter = null;
 +
 +      return this;
 +    } else
 +      return this;
 +  }
 +
 +  @Override
 +  public boolean isCurrent() {
 +    return expectedDeletionCount == tablet.getDataSourceDeletions();
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
 +    if (iter == null)
 +      iter = createIterator();
 +    return iter;
 +  }
 +
 +  private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
 +
 +    Map<FileRef,DataFileValue> files;
 +
 +    synchronized (tablet) {
 +
 +      if (memIters != null)
 +        throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
 +
 +      if (tablet.isClosed())
 +        throw new TabletClosedException();
 +
 +      if (interruptFlag.get())
 +        throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode());
 +
 +      // only acquire the file manager when we know the tablet is open
 +      if (fileManager == null) {
 +        fileManager = tablet.getTabletResources().newScanFileManager();
 +        tablet.addActiveScans(this);
 +      }
 +
 +      if (fileManager.getNumOpenFiles() != 0)
 +        throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
 +
 +      // set this before trying to get iterators in case
 +      // getIterators() throws an exception
 +      expectedDeletionCount = tablet.getDataSourceDeletions();
 +
 +      memIters = tablet.getTabletMemory().getIterators();
 +      Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan();
 +      fileReservationId = reservation.getFirst();
 +      files = reservation.getSecond();
 +    }
 +
 +    Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isIsolated());
 +
 +    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
 +
 +    iters.addAll(mapfiles);
 +    iters.addAll(memIters);
 +
 +    for (SortedKeyValueIterator<Key,Value> skvi : iters)
 +      ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
 +
 +    MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
 +
 +    TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files,
 +        options.getAuthorizations());
 +
 +    statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
 +
 +    DeletingIterator delIter = new DeletingIterator(statsIterator, false);
 +
 +    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
 +
 +    ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.getColumnSet());
 +
 +    VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.getAuthorizations(), options.getDefaultLabels());
 +
-     return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(),
-         options.getSsiList(), options.getSsio(), iterEnv));
++    if (!loadIters) {
++      return visFilter;
++    } else {
++      return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(),
++          options.getSsiList(), options.getSsio(), iterEnv));
++    }
 +  }
 +
 +  void close(boolean sawErrors) {
 +
 +    if (memIters != null) {
 +      tablet.getTabletMemory().returnIterators(memIters);
 +      memIters = null;
 +      tablet.getDatafileManager().returnFilesForScan(fileReservationId);
 +      fileReservationId = -1;
 +    }
 +
 +    synchronized (tablet) {
 +      if (tablet.removeScan(this) == 0)
 +        tablet.notifyAll();
 +    }
 +
 +    if (fileManager != null) {
 +      fileManager.releaseOpenFiles(sawErrors);
 +      fileManager = null;
 +    }
 +
 +    if (statsIterator != null) {
 +      statsIterator.report();
 +    }
 +
 +  }
 +
 +  public void interrupt() {
 +    interruptFlag.set(true);
 +  }
 +
 +  @Override
 +  public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  public void reattachFileManager() throws IOException {
 +    if (fileManager != null)
 +      fileManager.reattach();
 +  }
 +
 +  public void detachFileManager() {
 +    if (fileManager != null)
 +      fileManager.detach();
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +}