You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by uj...@apache.org on 2013/11/19 00:32:22 UTC

[1/9] git commit: ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 515cd9dfd -> c10ccf375


ACCUMULO-1858 Backport ZooKeeper clean up to 1.4 and 1.5.

Fix cherry picks two commits:

ACCUMULO-1379 - Adding close() to Instance to assist in freeing up resources

(cherry picked from commit 7da1164d87227960d3e0cfc841f753067e2c0304)

Reason: bugfix
Author: John Vines <jv...@gmail.com>

Differs from original by path changes and leaving out ConditionalWriterTest, which is only in 1.6.0+

----

ACCUMULO-1379 Fix edge cases if error in closing ZooKeeperInstance

(cherry picked from commit 3f6c66ede52cb1fb5a122d7bad06d7978ff0a671)

Reason: bugfix
Author: Christopher Tubbs <ct...@apache.org>

Signed-off-by: Bill Slacum <uj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79d686fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79d686fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79d686fa

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 79d686faa1e477b9cbd80c6f833ece402050b490
Parents: a40a6d4
Author: Sean Busbey <bu...@clouderagovt.com>
Authored: Wed Nov 13 09:19:36 2013 -0600
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 13:16:18 2013 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   |   7 ++
 .../accumulo/core/client/ZooKeeperInstance.java | 109 +++++++++++++------
 .../core/client/impl/ThriftTransportPool.java   |  16 ++-
 .../accumulo/core/client/mock/MockInstance.java |   5 +
 .../apache/accumulo/core/util/ThriftUtil.java   |   4 +
 .../accumulo/core/zookeeper/ZooCache.java       |   7 ++
 .../accumulo/core/zookeeper/ZooReader.java      |   4 +
 .../core/client/impl/TabletLocatorImplTest.java |   5 +
 .../accumulo/server/client/HdfsZooInstance.java |   9 ++
 9 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index b3d09ba..1820e7a 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -126,6 +126,13 @@ public interface Instance {
    *           when a user's credentials are invalid
    */
   public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
+
+  /**
+   * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
+   * stored which will enhance performance.
+   * @throws AccumuloException 
+   */
+  public abstract void close() throws AccumuloException;
   
   /**
    * Returns the AccumuloConfiguration to use when interacting with this instance.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index f657c07..1dae711 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ConnectorImpl;
@@ -33,6 +34,7 @@ import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.hadoop.fs.FileStatus;
@@ -57,18 +59,20 @@ import org.apache.log4j.Logger;
  */
 
 public class ZooKeeperInstance implements Instance {
-  
+
   private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
-  
+
   private String instanceId = null;
   private String instanceName = null;
-  
+
   private ZooCache zooCache;
-  
+
   private String zooKeepers;
-  
+
   private int zooKeepersSessionTimeOut;
-  
+
+  private volatile boolean closed = false;
+
   /**
    * 
    * @param instanceName
@@ -76,11 +80,11 @@ public class ZooKeeperInstance implements Instance {
    * @param zooKeepers
    *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
    */
-  
+
   public ZooKeeperInstance(String instanceName, String zooKeepers) {
     this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
   }
-  
+
   /**
    * 
    * @param instanceName
@@ -90,7 +94,7 @@ public class ZooKeeperInstance implements Instance {
    * @param sessionTimeout
    *          zoo keeper session time out in milliseconds.
    */
-  
+
   public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
     ArgumentChecker.notNull(instanceName, zooKeepers);
     this.instanceName = instanceName;
@@ -98,8 +102,9 @@ public class ZooKeeperInstance implements Instance {
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
     getInstanceID();
+    clientInstances.incrementAndGet();
   }
-  
+
   /**
    * 
    * @param instanceId
@@ -107,11 +112,11 @@ public class ZooKeeperInstance implements Instance {
    * @param zooKeepers
    *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
    */
-  
+
   public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
     this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
   }
-  
+
   /**
    * 
    * @param instanceId
@@ -121,17 +126,20 @@ public class ZooKeeperInstance implements Instance {
    * @param sessionTimeout
    *          zoo keeper session time out in milliseconds.
    */
-  
+
   public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
     ArgumentChecker.notNull(instanceId, zooKeepers);
     this.instanceId = instanceId.toString();
     this.zooKeepers = zooKeepers;
     this.zooKeepersSessionTimeOut = sessionTimeout;
     zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+    clientInstances.incrementAndGet();
   }
-  
+
   @Override
   public String getInstanceID() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceId == null) {
       // want the instance id to be stable for the life of this instance object,
       // so only get it once
@@ -143,95 +151,103 @@ public class ZooKeeperInstance implements Instance {
       }
       instanceId = new String(iidb);
     }
-    
+
     if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
       if (instanceName == null)
         throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
       throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
     }
-    
+
     return instanceId;
   }
-  
+
   @Override
   public List<String> getMasterLocations() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
-    
+
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
     byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
     opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-    
+
     if (loc == null) {
       return Collections.emptyList();
     }
-    
+
     return Collections.singletonList(new String(loc));
   }
-  
+
   @Override
   public String getRootTabletLocation() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
-    
+
     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
     byte[] loc = zooCache.get(zRootLocPath);
     opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-    
+
     if (loc == null) {
       return null;
     }
-    
+
     return new String(loc).split("\\|")[0];
   }
-  
+
   @Override
   public String getInstanceName() {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     if (instanceName == null)
       instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
-    
+
     return instanceName;
   }
-  
+
   @Override
   public String getZooKeepers() {
     return zooKeepers;
   }
-  
+
   @Override
   public int getZooKeepersSessionTimeOut() {
     return zooKeepersSessionTimeOut;
   }
-  
+
   @Override
   public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
     return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
   }
-  
+
   @Override
   public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
     return getConnector(user, ByteBufferUtil.toBytes(pass));
   }
-  
+
   // Suppress deprecation, ConnectorImpl is deprecated to warn clients against using.
   @SuppressWarnings("deprecation")
   @Override
   public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+    if (closed)
+      throw new RuntimeException("ZooKeeperInstance has been closed.");
     return new ConnectorImpl(this, user, pass);
   }
-  
+
   private AccumuloConfiguration conf = null;
-  
+
   @Override
   public AccumuloConfiguration getConfiguration() {
     if (conf == null)
       conf = AccumuloConfiguration.getDefaultConfiguration();
     return conf;
   }
-  
+
   @Override
   public void setConfiguration(AccumuloConfiguration conf) {
     this.conf = conf;
   }
-  
+
   /**
    * Given a zooCache and instanceId, look up the instance name.
    * 
@@ -277,4 +293,27 @@ public class ZooKeeperInstance implements Instance {
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
     return getConnector(auth.user, auth.password);
   }
+
+  static private final AtomicInteger clientInstances = new AtomicInteger(0);
+
+  @Override
+  public synchronized void close() throws AccumuloException {
+    if (!closed && clientInstances.decrementAndGet() == 0) {
+      try {
+        zooCache.close();
+        ThriftUtil.close();
+      } catch (InterruptedException e) {
+        clientInstances.incrementAndGet();
+        throw new AccumuloException("Issues closing ZooKeeper.");
+      }
+      closed = true;
+    }
+  }
+
+  @Override
+  public void finalize() {
+    // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
+    if (!closed)
+      log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ef3724b..f969f28 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -80,13 +80,15 @@ public class ThriftTransportPool {
   
   private static class Closer implements Runnable {
     ThriftTransportPool pool;
+    final AtomicBoolean stop;
     
-    public Closer(ThriftTransportPool pool) {
+    public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
       this.pool = pool;
+      this.stop = stop;
     }
     
     public void run() {
-      while (true) {
+      while (!stop.get()) {
         
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
         
@@ -592,6 +594,7 @@ public class ThriftTransportPool {
 
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+  private static AtomicBoolean stopDaemon;
   
   public static ThriftTransportPool getInstance() {
     SecurityManager sm = System.getSecurityManager();
@@ -600,8 +603,15 @@ public class ThriftTransportPool {
     }
     
     if (daemonStarted.compareAndSet(false, true)) {
-      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+      stopDaemon = new AtomicBoolean(false);
+      new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
     }
     return instance;
   }
+  
+  public static void close() {
+    if (daemonStarted.compareAndSet(true, false)) {
+      stopDaemon.set(true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index 2ff7b82..d8a15e0 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -140,4 +140,9 @@ public class MockInstance implements Instance {
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
     return getConnector(auth.user, auth.password);
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    // NOOP
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 1b1cdd7..3684ecd 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -165,4 +165,8 @@ public class ThriftUtil {
   public static TProtocolFactory protocolFactory() {
     return protocolFactory;
   }
+  
+  public static void close() {
+    ThriftTransportPool.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
index f5bdd6b..0a36923 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
@@ -307,4 +307,11 @@ public class ZooCache {
     
     return zc;
   }
+  
+  public void close() throws InterruptedException {
+    cache.clear();
+    statCache.clear();
+    childrenCache.clear();
+    zReader.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
index 47663ac..1bcd22b 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
@@ -107,4 +107,8 @@ public class ZooReader implements IZooReader {
   public ZooReader(Instance instance) {
     this(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
   }
+
+  public void close() throws InterruptedException {
+    getZooKeeper().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 538cb6c..624a824 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -448,6 +448,11 @@ public class TabletLocatorImplTest extends TestCase {
     public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
       return getConnector(auth.user, auth.password);
     }
+    
+    @Override
+    public void close() throws AccumuloException {
+      // NOOP
+    }
   }
   
   static class TServers {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79d686fa/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index e6cdb63..d68449d 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -177,6 +177,15 @@ public class HdfsZooInstance implements Instance {
     System.out.println("ZooKeepers: " + instance.getZooKeepers());
     System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
   }
+
+  @Override
+  public void close() throws AccumuloException {
+    try {
+      zooCache.close();
+    } catch (InterruptedException e) {
+      throw new AccumuloException("Issues closing ZooKeeper, try again");
+    }
+  }
   
   @Override
   public Connector getConnector(AuthInfo auth) throws AccumuloException, AccumuloSecurityException {


[8/9] git commit: ACCUMULO-1833 Account for race condition in test where mutations are flushed immediately.

Posted by uj...@apache.org.
ACCUMULO-1833 Account for race condition in test where mutations are flushed immediately.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/404e955e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/404e955e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/404e955e

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 404e955ece6b85bd77e3afc5c641b8eb823d547b
Parents: 60dd8bd
Author: Josh Elser <el...@apache.org>
Authored: Mon Nov 18 17:42:18 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Nov 18 17:44:32 2013 -0500

----------------------------------------------------------------------
 .../test/MultiTableBatchWriterTest.java         | 50 ++++++++++++++++----
 1 file changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/404e955e/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
index c5290e4..9ee1e6e 100644
--- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
@@ -340,7 +340,8 @@ public class MultiTableBatchWriterTest {
 
     TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
     MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
-
+    boolean mutationsRejected = false;
+    
     try {
       final String table1 = "testTableDelete_table1", table2 = "testTableDelete_table2";
 
@@ -364,19 +365,27 @@ public class MultiTableBatchWriterTest {
       m2.put("col1", "", "val1");
       m2.put("col2", "", "val2");
 
-      bw1.addMutation(m2);
-      bw2.addMutation(m2);
+      try {
+        bw1.addMutation(m2);
+        bw2.addMutation(m2);
+      } catch (MutationsRejectedException e) {
+        // Pass - Mutations might flush immediately
+        mutationsRejected = true;
+      }
 
     } finally {
       if (null != mtbw) {
         try {
+          // Mutations might have flushed before the table offline occurred
           mtbw.close();
-          Assert.fail("Should not be able to close batch writers");
         } catch (MutationsRejectedException e) {
           // Pass
+          mutationsRejected = true;
         }
       }
     }
+    
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
   }
 
   @Test
@@ -389,6 +398,7 @@ public class MultiTableBatchWriterTest {
 
     TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
     MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+    boolean mutationsRejected = false;
 
     try {
       final String table1 = "testOfflineTable_table1", table2 = "testOfflineTable_table2";
@@ -413,19 +423,26 @@ public class MultiTableBatchWriterTest {
       m2.put("col1", "", "val1");
       m2.put("col2", "", "val2");
 
-      bw1.addMutation(m2);
-      bw2.addMutation(m2);
+      try {
+        bw1.addMutation(m2);
+        bw2.addMutation(m2);
+      } catch (MutationsRejectedException e) {
+        // Pass -- Mutations might flush immediately and fail because of offline table
+        mutationsRejected = true;
+      }
     } finally {
       if (null != mtbw) {
         try {
+          // Mutations might have flushed before the table offline occurred
           mtbw.close();
-          Assert.fail("Should not be able to close batch writers");
         } catch (MutationsRejectedException e) {
           // Pass
+          mutationsRejected = true;
         }
       }
-
     }
+    
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
   }
 
   @Test
@@ -438,6 +455,7 @@ public class MultiTableBatchWriterTest {
     
     TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
     MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+    boolean mutationsRejected = false;
 
     try {
       final String table1 = "testOfflineTableWithCache_table1", table2 = "testOfflineTableWithCache_table2";
@@ -461,6 +479,7 @@ public class MultiTableBatchWriterTest {
         bw1 = mtbw.getBatchWriter(table1);
       } catch (TableOfflineException e) {
         // pass
+        mutationsRejected = true;
       }
 
       tops.offline(table2);
@@ -469,17 +488,21 @@ public class MultiTableBatchWriterTest {
         bw2 = mtbw.getBatchWriter(table2);
       } catch (TableOfflineException e) {
         // pass
+        mutationsRejected = true;
       }
     } finally {
       if (null != mtbw) {
         try {
+          // Mutations might have flushed before the table offline occurred
           mtbw.close();
-          Assert.fail("Expecting close on MTBW to fail due to offline tables");
         } catch (MutationsRejectedException e) {
           // Pass
+          mutationsRejected = true;
         }
       }
     }
+
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
   }
 
   @Test
@@ -492,6 +515,7 @@ public class MultiTableBatchWriterTest {
 
     TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
     MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 0, TimeUnit.SECONDS);
+    boolean mutationsRejected = false;
 
     try {
       final String table1 = "testOfflineTableWithoutCache_table1", table2 = "testOfflineTableWithoutCache_table2";
@@ -509,6 +533,7 @@ public class MultiTableBatchWriterTest {
       bw1.addMutation(m1);
       bw2.addMutation(m1);
 
+      // Mutations might or might not flush before tables goes offline
       tops.offline(table1);
       tops.offline(table2);
 
@@ -517,6 +542,7 @@ public class MultiTableBatchWriterTest {
         Assert.fail(table1 + " should be offline");
       } catch (TableOfflineException e) {
         // pass
+        mutationsRejected = true;
       }
 
       try {
@@ -524,16 +550,20 @@ public class MultiTableBatchWriterTest {
         Assert.fail(table1 + " should be offline");
       } catch (TableOfflineException e) {
         // pass
+        mutationsRejected = true;
       }
     } finally {
       if (null != mtbw) {
         try {
+          // Mutations might have flushed before the table offline occurred
           mtbw.close();
-          Assert.fail("Expecting close on MTBW to fail due to offline tables");
         } catch (MutationsRejectedException e) {
           // Pass
+          mutationsRejected = true;
         }
       }
     }
+
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
   }
 }


[9/9] git commit: Merge remote-tracking branch 'apache-committers/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by uj...@apache.org.
Merge remote-tracking branch 'apache-committers/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Did a merge for ACCUMULO-1858 and picked some "Elser" slack.

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
	server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c10ccf37
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c10ccf37
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c10ccf37

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: c10ccf375b97c1882d5b89decd701bbdae7f71ef
Parents: 515cd9d 404e955
Author: Bill Slacum <uj...@apache.org>
Authored: Mon Nov 18 18:31:51 2013 -0500
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 18:31:51 2013 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   | 14 +++---
 .../accumulo/core/client/ZooKeeperInstance.java |  6 ++-
 .../simple/client/RandomBatchWriter.java        | 20 ++++----
 .../test/MultiTableBatchWriterTest.java         | 50 ++++++++++++++++----
 4 files changed, 63 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c10ccf37/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/Instance.java
index 95fc933,612301e..27d502f
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@@ -147,11 -169,5 +154,4 @@@ public interface Instance 
     * @since 1.5.0
     */
    public abstract Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException;
--  
-   /**
-    * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
-    * stored which will enhance performance.
-    * @throws AccumuloException 
-    */
-   public abstract void close() throws AccumuloException;
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c10ccf37/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c10ccf37/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
index 8c6bcc6,9ee1e6e..f7e1146
--- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
@@@ -339,9 -338,10 +339,10 @@@ public class MultiTableBatchWriterTest 
  
      BatchWriterConfig config = new BatchWriterConfig();
  
 -    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
 +    Credentials creds = new Credentials("root", password);
      MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
- 
+     boolean mutationsRejected = false;
+     
      try {
        final String table1 = "testTableDelete_table1", table2 = "testTableDelete_table2";
  
@@@ -388,8 -396,9 +397,9 @@@
  
      BatchWriterConfig config = new BatchWriterConfig();
  
 -    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
 +    Credentials creds = new Credentials("root", password);
      MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+     boolean mutationsRejected = false;
  
      try {
        final String table1 = "testOfflineTable_table1", table2 = "testOfflineTable_table2";
@@@ -437,8 -453,9 +454,9 @@@
  
      BatchWriterConfig config = new BatchWriterConfig();
      
 -    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
 +    Credentials creds = new Credentials("root", password);
      MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+     boolean mutationsRejected = false;
  
      try {
        final String table1 = "testOfflineTableWithCache_table1", table2 = "testOfflineTableWithCache_table2";
@@@ -491,8 -513,9 +514,9 @@@
  
      BatchWriterConfig config = new BatchWriterConfig();
  
 -    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
 +    Credentials creds = new Credentials("root", password);
      MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 0, TimeUnit.SECONDS);
+     boolean mutationsRejected = false;
  
      try {
        final String table1 = "testOfflineTableWithoutCache_table1", table2 = "testOfflineTableWithoutCache_table2";


[2/9] Merge remote-tracking branch 'apache-committers/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Posted by uj...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index ca7e42c,0000000..f12dca5
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@@ -1,204 -1,0 +1,213 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.client;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.impl.ConnectorImpl;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.StringUtil;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
 + * 
 + */
 +public class HdfsZooInstance implements Instance {
 +
 +  public static class AccumuloNotInitializedException extends RuntimeException {
 +    private static final long serialVersionUID = 1L;
 +
 +    public AccumuloNotInitializedException(String string) {
 +      super(string);
 +    }
 +  }
 +
 +  private HdfsZooInstance() {
 +    AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration();
 +    zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
 +  }
 +
 +  private static HdfsZooInstance cachedHdfsZooInstance = null;
 +
 +  public static synchronized Instance getInstance() {
 +    if (cachedHdfsZooInstance == null)
 +      cachedHdfsZooInstance = new HdfsZooInstance();
 +    return cachedHdfsZooInstance;
 +  }
 +
 +  private static ZooCache zooCache;
 +  private static String instanceId = null;
 +  private static final Logger log = Logger.getLogger(HdfsZooInstance.class);
 +
 +  @Override
 +  public String getRootTabletLocation() {
 +    String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
 +
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache.");
 +
 +    byte[] loc = zooCache.get(zRootLocPath);
 +
 +    opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
 +
 +    if (loc == null) {
 +      return null;
 +    }
 +
 +    return new String(loc).split("\\|")[0];
 +  }
 +
 +  @Override
 +  public List<String> getMasterLocations() {
 +
 +    String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
 +
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
 +
 +    byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null);
 +
 +    opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
 +
 +    if (loc == null) {
 +      return Collections.emptyList();
 +    }
 +
 +    return Collections.singletonList(new String(loc));
 +  }
 +
 +  @Override
 +  public String getInstanceID() {
 +    if (instanceId == null)
 +      _getInstanceID();
 +    return instanceId;
 +  }
 +
 +  private static synchronized void _getInstanceID() {
 +    if (instanceId == null) {
 +      @SuppressWarnings("deprecation")
 +      String instanceIdFromFile = ZooKeeperInstance.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
 +      instanceId = instanceIdFromFile;
 +    }
 +  }
 +
 +  @Override
 +  public String getInstanceName() {
 +    return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
 +  }
 +
 +  @Override
 +  public String getZooKeepers() {
 +    return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
 +  }
 +
 +  @Override
 +  public int getZooKeepersSessionTimeOut() {
 +    return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 +  }
 +
 +  @Override
 +  // Not really deprecated, just not for client use
 +  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
 +  }
 +
 +  @SuppressWarnings("deprecation")
 +  private Connector getConnector(TCredentials cred) throws AccumuloException, AccumuloSecurityException {
 +    return new ConnectorImpl(this, cred);
 +  }
 +
 +  @Deprecated
 +  @Override
 +  // Not really deprecated, just not for client use
 +  public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, new PasswordToken(pass));
 +  }
 +
 +  @Deprecated
 +  @Override
 +  // Not really deprecated, just not for client use
 +  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, ByteBufferUtil.toBytes(pass));
 +  }
 +
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
 +  }
 +
 +  private AccumuloConfiguration conf = null;
 +
 +  @Override
 +  public AccumuloConfiguration getConfiguration() {
 +    if (conf == null)
 +      conf = new ServerConfiguration(this).getConfiguration();
 +    return conf;
 +  }
 +
 +  @Override
 +  public void setConfiguration(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +
 +  public static void main(String[] args) {
 +    Instance instance = HdfsZooInstance.getInstance();
 +    System.out.println("Instance Name: " + instance.getInstanceName());
 +    System.out.println("Instance ID: " + instance.getInstanceID());
 +    System.out.println("ZooKeepers: " + instance.getZooKeepers());
 +    System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
 +  }
 +
++  @Override
++  public void close() throws AccumuloException {
++    try {
++      zooCache.close();
++    } catch (InterruptedException e) {
++      throw new AccumuloException("Issues closing ZooKeeper, try again");
++    }
++  }
++  
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(auth.user, auth.getPassword());
 +  }
 +}


[3/9] Merge remote-tracking branch 'apache-committers/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Posted by uj...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 8c63b1f,0000000..a42c280
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@@ -1,1280 -1,0 +1,1285 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +
 +import junit.framework.TestCase;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocations;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 +import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletLocationObtainer;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.MetadataTable;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.hadoop.io.Text;
 +
 +public class TabletLocatorImplTest extends TestCase {
 +  
 +  private static final KeyExtent RTE = Constants.ROOT_TABLET_EXTENT;
 +  private static final KeyExtent MTE = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, RTE.getEndRow());
 +  private static TCredentials credential = null;
 +  
 +  static KeyExtent nke(String t, String er, String per) {
 +    return new KeyExtent(new Text(t), er == null ? null : new Text(er), per == null ? null : new Text(per));
 +  }
 +  
 +  static Range nr(String k1, boolean si, String k2, boolean ei) {
 +    return new Range(k1 == null ? null : new Text(k1), si, k2 == null ? null : new Text(k2), ei);
 +  }
 +  
 +  static Range nr(String k1, String k2) {
 +    return new Range(k1 == null ? null : new Text(k1), k2 == null ? null : new Text(k2));
 +  }
 +  
 +  static List<Range> nrl(Range... ranges) {
 +    return Arrays.asList(ranges);
 +  }
 +  
 +  static Object[] nol(Object... objs) {
 +    return objs;
 +  }
 +  
 +  @SuppressWarnings("unchecked")
 +  static Map<String,Map<KeyExtent,List<Range>>> createExpectedBinnings(Object... data) {
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> expBinnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +    
 +    for (int i = 0; i < data.length; i += 2) {
 +      String loc = (String) data[i];
 +      Object binData[] = (Object[]) data[i + 1];
 +      
 +      HashMap<KeyExtent,List<Range>> binnedKE = new HashMap<KeyExtent,List<Range>>();
 +      
 +      expBinnedRanges.put(loc, binnedKE);
 +      
 +      for (int j = 0; j < binData.length; j += 2) {
 +        KeyExtent ke = (KeyExtent) binData[j];
 +        List<Range> ranges = (List<Range>) binData[j + 1];
 +        
 +        binnedKE.put(ke, ranges);
 +      }
 +    }
 +    
 +    return expBinnedRanges;
 +  }
 +  
 +  static TreeMap<KeyExtent,TabletLocation> createMetaCacheKE(Object... data) {
 +    TreeMap<KeyExtent,TabletLocation> mcke = new TreeMap<KeyExtent,TabletLocation>();
 +    
 +    for (int i = 0; i < data.length; i += 2) {
 +      KeyExtent ke = (KeyExtent) data[i];
 +      String loc = (String) data[i + 1];
 +      mcke.put(ke, new TabletLocation(ke, loc));
 +    }
 +    
 +    return mcke;
 +  }
 +  
 +  static TreeMap<Text,TabletLocation> createMetaCache(Object... data) {
 +    TreeMap<KeyExtent,TabletLocation> mcke = createMetaCacheKE(data);
 +    
 +    TreeMap<Text,TabletLocation> mc = new TreeMap<Text,TabletLocation>(TabletLocatorImpl.endRowComparator);
 +    
 +    for (Entry<KeyExtent,TabletLocation> entry : mcke.entrySet()) {
 +      if (entry.getKey().getEndRow() == null)
 +        mc.put(TabletLocatorImpl.MAX_TEXT, entry.getValue());
 +      else
 +        mc.put(entry.getKey().getEndRow(), entry.getValue());
 +    }
 +    
 +    return mc;
 +  }
 +  
 +  static TabletLocatorImpl createLocators(TServers tservers, String rootTabLoc, String metaTabLoc, String table, Object... data) {
 +    
 +    TreeMap<KeyExtent,TabletLocation> mcke = createMetaCacheKE(data);
 +    
 +    TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers);
 +    TestInstance testInstance = new TestInstance("instance1", "tserver1");
 +    
 +    RootTabletLocator rtl = new RootTabletLocator(testInstance);
 +    TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(Constants.METADATA_TABLE_ID), rtl, ttlo);
 +    TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text(table), rootTabletCache, ttlo);
 +    
 +    setLocation(tservers, rootTabLoc, RTE, MTE, metaTabLoc);
 +    
 +    for (Entry<KeyExtent,TabletLocation> entry : mcke.entrySet()) {
 +      setLocation(tservers, metaTabLoc, MTE, entry.getKey(), entry.getValue().tablet_location);
 +    }
 +    
 +    return tab1TabletCache;
 +    
 +  }
 +  
 +  static TabletLocatorImpl createLocators(String table, Object... data) {
 +    TServers tservers = new TServers();
 +    return createLocators(tservers, "tserver1", "tserver2", table, data);
 +  }
 +  
 +  private void runTest(Text tableName, List<Range> ranges, TabletLocatorImpl tab1TabletCache, Map<String,Map<KeyExtent,List<Range>>> expected) throws Exception {
 +    List<Range> failures = Collections.emptyList();
 +    runTest(tableName, ranges, tab1TabletCache, expected, failures);
 +  }
 +  
 +  private void runTest(Text tableName, List<Range> ranges, TabletLocatorImpl tab1TabletCache, Map<String,Map<KeyExtent,List<Range>>> expected,
 +      List<Range> efailures) throws Exception {
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +    List<Range> f = tab1TabletCache.binRanges(ranges, binnedRanges, credential);
 +    assertEquals(expected, binnedRanges);
 +    
 +    HashSet<Range> f1 = new HashSet<Range>(f);
 +    HashSet<Range> f2 = new HashSet<Range>(efailures);
 +    
 +    assertEquals(f2, f1);
 +  }
 +  
 +  static Set<KeyExtent> nkes(KeyExtent... extents) {
 +    HashSet<KeyExtent> kes = new HashSet<KeyExtent>();
 +    
 +    for (KeyExtent keyExtent : extents) {
 +      kes.add(keyExtent);
 +    }
 +    
 +    return kes;
 +  }
 +  
 +  static void runTest(TreeMap<Text,TabletLocation> mc, KeyExtent remove, Set<KeyExtent> expected) {
 +    // copy so same metaCache can be used for multiple test
 +    
 +    mc = new TreeMap<Text,TabletLocation>(mc);
 +    
 +    TabletLocatorImpl.removeOverlapping(mc, remove);
 +    
 +    HashSet<KeyExtent> eic = new HashSet<KeyExtent>();
 +    for (TabletLocation tl : mc.values()) {
 +      eic.add(tl.tablet_extent);
 +    }
 +    
 +    assertEquals(expected, eic);
 +  }
 +  
 +  static Mutation nm(String row, String... data) {
 +    Mutation mut = new Mutation(new Text(row));
 +    
 +    for (int i = 0; i < data.length; i++) {
 +      String[] cvp = data[i].split("=");
 +      String[] cols = cvp[0].split(":");
 +      
 +      mut.put(new Text(cols[0]), new Text(cols[1]), new Value(cvp[1].getBytes()));
 +    }
 +    
 +    return mut;
 +  }
 +  
 +  static List<Mutation> nml(Mutation... ma) {
 +    return Arrays.asList(ma);
 +  }
 +  
 +  private void runTest(TabletLocatorImpl metaCache, List<Mutation> ml, Map<String,Map<KeyExtent,List<String>>> emb, String... efailures) throws Exception {
 +    Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
 +    List<Mutation> afailures = new ArrayList<Mutation>();
 +    metaCache.binMutations(ml, binnedMutations, afailures, credential);
 +    
 +    verify(emb, binnedMutations);
 +    
 +    ArrayList<String> afs = new ArrayList<String>();
 +    ArrayList<String> efs = new ArrayList<String>(Arrays.asList(efailures));
 +    
 +    for (Mutation mutation : afailures) {
 +      afs.add(new String(mutation.getRow()));
 +    }
 +    
 +    Collections.sort(afs);
 +    Collections.sort(efs);
 +    
 +    assertEquals(efs, afs);
 +    
 +  }
 +  
 +  private void verify(Map<String,Map<KeyExtent,List<String>>> expected, Map<String,TabletServerMutations> actual) {
 +    assertEquals(expected.keySet(), actual.keySet());
 +    
 +    for (String server : actual.keySet()) {
 +      TabletServerMutations atb = actual.get(server);
 +      Map<KeyExtent,List<String>> etb = expected.get(server);
 +      
 +      assertEquals(etb.keySet(), atb.getMutations().keySet());
 +      
 +      for (KeyExtent ke : etb.keySet()) {
 +        ArrayList<String> eRows = new ArrayList<String>(etb.get(ke));
 +        ArrayList<String> aRows = new ArrayList<String>();
 +        
 +        for (Mutation m : atb.getMutations().get(ke)) {
 +          aRows.add(new String(m.getRow()));
 +        }
 +        
 +        Collections.sort(eRows);
 +        Collections.sort(aRows);
 +        
 +        assertEquals(eRows, aRows);
 +      }
 +    }
 +    
 +  }
 +  
 +  static Map<String,Map<KeyExtent,List<String>>> cemb(Object[]... ols) {
 +    
 +    Map<String,Map<KeyExtent,List<String>>> emb = new HashMap<String,Map<KeyExtent,List<String>>>();
 +    
 +    for (Object[] ol : ols) {
 +      String row = (String) ol[0];
 +      String server = (String) ol[1];
 +      KeyExtent ke = (KeyExtent) ol[2];
 +      
 +      Map<KeyExtent,List<String>> tb = emb.get(server);
 +      if (tb == null) {
 +        tb = new HashMap<KeyExtent,List<String>>();
 +        emb.put(server, tb);
 +      }
 +      
 +      List<String> rl = tb.get(ke);
 +      if (rl == null) {
 +        rl = new ArrayList<String>();
 +        tb.put(ke, rl);
 +      }
 +      
 +      rl.add(row);
 +    }
 +    
 +    return emb;
 +  }
 +  
 +  public void testRemoveOverlapping1() {
 +    TreeMap<Text,TabletLocation> mc = createMetaCache(nke("0", null, null), "l1");
 +    
 +    runTest(mc, nke("0", "a", null), nkes());
 +    runTest(mc, nke("0", null, null), nkes());
 +    runTest(mc, nke("0", null, "a"), nkes());
 +    
 +    mc = createMetaCache(nke("0", "g", null), "l1", nke("0", "r", "g"), "l1", nke("0", null, "r"), "l1");
 +    runTest(mc, nke("0", null, null), nkes());
 +    
 +    runTest(mc, nke("0", "a", null), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "g", null), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "h", null), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", null), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", null), nkes());
 +    
 +    runTest(mc, nke("0", "b", "a"), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "g", "a"), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "h", "a"), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", "a"), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", "a"), nkes());
 +    
 +    runTest(mc, nke("0", "h", "g"), nkes(nke("0", "g", null), nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", "g"), nkes(nke("0", "g", null), nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", "g"), nkes(nke("0", "g", null)));
 +    
 +    runTest(mc, nke("0", "i", "h"), nkes(nke("0", "g", null), nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", "h"), nkes(nke("0", "g", null), nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", "h"), nkes(nke("0", "g", null)));
 +    
 +    runTest(mc, nke("0", "z", "f"), nkes());
 +    runTest(mc, nke("0", "z", "g"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", "z", "q"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", "z", "r"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    runTest(mc, nke("0", "z", "s"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    
 +    runTest(mc, nke("0", null, "f"), nkes());
 +    runTest(mc, nke("0", null, "g"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", null, "q"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", null, "r"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    runTest(mc, nke("0", null, "s"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    
 +  }
 +  
 +  public void testRemoveOverlapping2() {
 +    
 +    // test removes when cache does not contain all tablets in a table
 +    TreeMap<Text,TabletLocation> mc = createMetaCache(nke("0", "r", "g"), "l1", nke("0", null, "r"), "l1");
 +    
 +    runTest(mc, nke("0", "a", null), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "g", null), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "h", null), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", null), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", null), nkes());
 +    
 +    runTest(mc, nke("0", "b", "a"), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "g", "a"), nkes(nke("0", "r", "g"), nke("0", null, "r")));
 +    runTest(mc, nke("0", "h", "a"), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "r", "a"), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", "a"), nkes());
 +    
 +    runTest(mc, nke("0", "h", "g"), nkes(nke("0", null, "r")));
 +    
 +    mc = createMetaCache(nke("0", "g", null), "l1", nke("0", null, "r"), "l1");
 +    
 +    runTest(mc, nke("0", "h", "g"), nkes(nke("0", "g", null), nke("0", null, "r")));
 +    runTest(mc, nke("0", "h", "a"), nkes(nke("0", null, "r")));
 +    runTest(mc, nke("0", "s", "g"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", "s", "a"), nkes());
 +    
 +    mc = createMetaCache(nke("0", "g", null), "l1", nke("0", "r", "g"), "l1");
 +    
 +    runTest(mc, nke("0", "z", "f"), nkes());
 +    runTest(mc, nke("0", "z", "g"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", "z", "q"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", "z", "r"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    runTest(mc, nke("0", "z", "s"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    
 +    runTest(mc, nke("0", null, "f"), nkes());
 +    runTest(mc, nke("0", null, "g"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", null, "q"), nkes(nke("0", "g", null)));
 +    runTest(mc, nke("0", null, "r"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +    runTest(mc, nke("0", null, "s"), nkes(nke("0", "g", null), nke("0", "r", "g")));
 +  }
 +  
 +  static class TestInstance implements Instance {
 +    
 +    private final String iid;
 +    private String rtl;
 +    
 +    public TestInstance(String iid, String rtl) {
 +      this.iid = iid;
 +      this.rtl = rtl;
 +    }
 +    
 +    @Override
 +    public String getInstanceID() {
 +      return iid;
 +    }
 +    
 +    @Override
 +    public String getInstanceName() {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public List<String> getMasterLocations() {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public String getRootTabletLocation() {
 +      return rtl;
 +    }
 +    
 +    @Override
 +    public String getZooKeepers() {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public int getZooKeepersSessionTimeOut() {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    public void setRootTabletLocation(String rtl) {
 +      this.rtl = rtl;
 +    }
 +    
 +    @Override
 +    @Deprecated
 +    public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    @Deprecated
 +    public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    AccumuloConfiguration conf = AccumuloConfiguration.getDefaultConfiguration();
 +    
 +    @Override
 +    public AccumuloConfiguration getConfiguration() {
 +      return conf;
 +    }
 +    
 +    @Override
 +    public void setConfiguration(AccumuloConfiguration conf) {
 +      this.conf = conf;
 +    }
 +    
 +    @Override
 +    @Deprecated
 +    public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Deprecated
 +    @Override
 +    public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +      return getConnector(auth.user, auth.getPassword());
 +    }
 +
 +    @Override
 +    public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +      throw new UnsupportedOperationException();
 +    }
++    
++    @Override
++    public void close() throws AccumuloException {
++      // NOOP
++    }
 +  }
 +  
 +  static class TServers {
 +    private final Map<String,Map<KeyExtent,SortedMap<Key,Value>>> tservers = new HashMap<String,Map<KeyExtent,SortedMap<Key,Value>>>();
 +  }
 +  
 +  static class TestTabletLocationObtainer implements TabletLocationObtainer {
 +    
 +    private final Map<String,Map<KeyExtent,SortedMap<Key,Value>>> tservers;
 +    
 +    TestTabletLocationObtainer(TServers tservers) {
 +      this.tservers = tservers.tservers;
 +    }
 +    
 +    @Override
 +    public TabletLocations lookupTablet(TabletLocation src, Text row, Text stopRow, TabletLocator parent, TCredentials credentials) throws AccumuloSecurityException {
 +      
 +      // System.out.println("lookupTablet("+src+","+row+","+stopRow+","+ parent+")");
 +      // System.out.println(tservers);
 +      
 +      ArrayList<TabletLocation> list = new ArrayList<TabletLocation>();
 +      
 +      Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.get(src.tablet_location);
 +      
 +      if (tablets == null) {
 +        parent.invalidateCache(src.tablet_location);
 +        return null;
 +      }
 +      
 +      SortedMap<Key,Value> tabletData = tablets.get(src.tablet_extent);
 +      
 +      if (tabletData == null) {
 +        parent.invalidateCache(src.tablet_extent);
 +        return null;
 +      }
 +      
 +      // the following clip is done on a tablet, do it here to see if it throws exceptions
 +      src.tablet_extent.toDataRange().clip(new Range(row, true, stopRow, true));
 +      
 +      Key startKey = new Key(row);
 +      Key stopKey = new Key(stopRow).followingKey(PartialKey.ROW);
 +      
 +      SortedMap<Key,Value> results = tabletData.tailMap(startKey).headMap(stopKey);
 +      
 +      Pair<SortedMap<KeyExtent,Text>,List<KeyExtent>> metadata = MetadataTable.getMetadataLocationEntries(results);
 +      
 +      for (Entry<KeyExtent,Text> entry : metadata.getFirst().entrySet()) {
 +        list.add(new TabletLocation(entry.getKey(), entry.getValue().toString()));
 +      }
 +      
 +      return new TabletLocations(list, metadata.getSecond());
 +    }
 +    
 +    @Override
 +    public List<TabletLocation> lookupTablets(String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent, TCredentials credentials)
 +        throws AccumuloSecurityException {
 +      
 +      ArrayList<TabletLocation> list = new ArrayList<TabletLocation>();
 +      
 +      Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.get(tserver);
 +      
 +      if (tablets == null) {
 +        parent.invalidateCache(tserver);
 +        return list;
 +      }
 +      
 +      TreeMap<Key,Value> results = new TreeMap<Key,Value>();
 +      
 +      Set<Entry<KeyExtent,List<Range>>> es = map.entrySet();
 +      List<KeyExtent> failures = new ArrayList<KeyExtent>();
 +      for (Entry<KeyExtent,List<Range>> entry : es) {
 +        SortedMap<Key,Value> tabletData = tablets.get(entry.getKey());
 +        
 +        if (tabletData == null) {
 +          failures.add(entry.getKey());
 +          continue;
 +        }
 +        List<Range> ranges = entry.getValue();
 +        for (Range range : ranges) {
 +          SortedMap<Key,Value> tm;
 +          if (range.getStartKey() == null)
 +            tm = tabletData;
 +          else
 +            tm = tabletData.tailMap(range.getStartKey());
 +          
 +          for (Entry<Key,Value> de : tm.entrySet()) {
 +            if (range.afterEndKey(de.getKey())) {
 +              break;
 +            }
 +            
 +            if (range.contains(de.getKey())) {
 +              results.put(de.getKey(), de.getValue());
 +            }
 +          }
 +        }
 +      }
 +      
 +      if (failures.size() > 0)
 +        parent.invalidateCache(failures);
 +      
 +      SortedMap<KeyExtent,Text> metadata = MetadataTable.getMetadataLocationEntries(results).getFirst();
 +      
 +      for (Entry<KeyExtent,Text> entry : metadata.entrySet()) {
 +        list.add(new TabletLocation(entry.getKey(), entry.getValue().toString()));
 +      }
 +      
 +      return list;
 +      
 +    }
 +    
 +  }
 +  
 +  static void createEmptyTablet(TServers tservers, String server, KeyExtent tablet) {
 +    Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.tservers.get(server);
 +    if (tablets == null) {
 +      tablets = new HashMap<KeyExtent,SortedMap<Key,Value>>();
 +      tservers.tservers.put(server, tablets);
 +    }
 +    
 +    SortedMap<Key,Value> tabletData = tablets.get(tablet);
 +    if (tabletData == null) {
 +      tabletData = new TreeMap<Key,Value>();
 +      tablets.put(tablet, tabletData);
 +    } else if (tabletData.size() > 0) {
 +      throw new RuntimeException("Asked for empty tablet, but non empty tablet exists");
 +    }
 +  }
 +  
 +  static void setLocation(TServers tservers, String server, KeyExtent tablet, KeyExtent ke, String location, String instance) {
 +    Map<KeyExtent,SortedMap<Key,Value>> tablets = tservers.tservers.get(server);
 +    if (tablets == null) {
 +      tablets = new HashMap<KeyExtent,SortedMap<Key,Value>>();
 +      tservers.tservers.put(server, tablets);
 +    }
 +    
 +    SortedMap<Key,Value> tabletData = tablets.get(tablet);
 +    if (tabletData == null) {
 +      tabletData = new TreeMap<Key,Value>();
 +      tablets.put(tablet, tabletData);
 +    }
 +    
 +    Text mr = ke.getMetadataEntry();
 +    Value per = KeyExtent.encodePrevEndRow(ke.getPrevEndRow());
 +    
 +    if (location != null) {
 +      if (instance == null)
 +        instance = "";
 +      Key lk = new Key(mr, Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text(instance));
 +      tabletData.put(lk, new Value(location.getBytes()));
 +    }
 +    
 +    Key pk = new Key(mr, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(), Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier());
 +    tabletData.put(pk, per);
 +  }
 +  
 +  static void setLocation(TServers tservers, String server, KeyExtent tablet, KeyExtent ke, String location) {
 +    setLocation(tservers, server, tablet, ke, location, "");
 +  }
 +
 +  static void deleteServer(TServers tservers, String server) {
 +    tservers.tservers.remove(server);
 +    
 +  }
 +  
 +  private void locateTabletTest(TabletLocatorImpl cache, String row, boolean skipRow, KeyExtent expected, String server, TCredentials credentials)
 +      throws Exception {
 +    TabletLocation tl = cache.locateTablet(new Text(row), skipRow, false, credentials);
 +    
 +    if (expected == null) {
 +      if (tl != null)
 +        System.out.println("tl = " + tl);
 +      assertNull(tl);
 +    } else {
 +      assertNotNull(tl);
 +      assertEquals(server, tl.tablet_location);
 +      assertEquals(expected, tl.tablet_extent);
 +    }
 +  }
 +  
 +  private void locateTabletTest(TabletLocatorImpl cache, String row, KeyExtent expected, String server, TCredentials credentials) throws Exception {
 +    locateTabletTest(cache, row, false, expected, server, credentials);
 +  }
 +  
 +  public void test1() throws Exception {
 +    TServers tservers = new TServers();
 +    TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers);
 +    TestInstance testInstance = new TestInstance("instance1", "tserver1");
 +    
 +    RootTabletLocator rtl = new RootTabletLocator(testInstance);
 +    TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(Constants.METADATA_TABLE_ID), rtl, ttlo);
 +    TabletLocatorImpl tab1TabletCache = new TabletLocatorImpl(new Text("tab1"), rootTabletCache, ttlo);
 +    
 +    locateTabletTest(tab1TabletCache, "r1", null, null, credential);
 +    
 +    KeyExtent tab1e = nke("tab1", null, null);
 +    
 +    setLocation(tservers, "tserver1", RTE, MTE, "tserver2");
 +    setLocation(tservers, "tserver2", MTE, tab1e, "tserver3");
 +    
 +    locateTabletTest(tab1TabletCache, "r1", tab1e, "tserver3", credential);
 +    locateTabletTest(tab1TabletCache, "r2", tab1e, "tserver3", credential);
 +    
 +    // simulate a split
 +    KeyExtent tab1e1 = nke("tab1", "g", null);
 +    KeyExtent tab1e2 = nke("tab1", null, "g");
 +    
 +    setLocation(tservers, "tserver2", MTE, tab1e1, "tserver4");
 +    setLocation(tservers, "tserver2", MTE, tab1e2, "tserver5");
 +    
 +    locateTabletTest(tab1TabletCache, "r1", tab1e, "tserver3", credential);
 +    tab1TabletCache.invalidateCache(tab1e);
 +    locateTabletTest(tab1TabletCache, "r1", tab1e2, "tserver5", credential);
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver4", credential);
 +    locateTabletTest(tab1TabletCache, "a", true, tab1e1, "tserver4", credential);
 +    locateTabletTest(tab1TabletCache, "g", tab1e1, "tserver4", credential);
 +    locateTabletTest(tab1TabletCache, "g", true, tab1e2, "tserver5", credential);
 +    
 +    // simulate a partial split
 +    KeyExtent tab1e22 = nke("tab1", null, "m");
 +    setLocation(tservers, "tserver2", MTE, tab1e22, "tserver6");
 +    locateTabletTest(tab1TabletCache, "r1", tab1e2, "tserver5", credential);
 +    tab1TabletCache.invalidateCache(tab1e2);
 +    locateTabletTest(tab1TabletCache, "r1", tab1e22, "tserver6", credential);
 +    locateTabletTest(tab1TabletCache, "h", null, null, credential);
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver4", credential);
 +    KeyExtent tab1e21 = nke("tab1", "m", "g");
 +    setLocation(tservers, "tserver2", MTE, tab1e21, "tserver7");
 +    locateTabletTest(tab1TabletCache, "r1", tab1e22, "tserver6", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver7", credential);
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver4", credential);
 +    
 +    // simulate a migration
 +    setLocation(tservers, "tserver2", MTE, tab1e21, "tserver8");
 +    tab1TabletCache.invalidateCache(tab1e21);
 +    locateTabletTest(tab1TabletCache, "r1", tab1e22, "tserver6", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8", credential);
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver4", credential);
 +    
 +    // simulate a server failure
 +    setLocation(tservers, "tserver2", MTE, tab1e21, "tserver9");
 +    tab1TabletCache.invalidateCache("tserver8");
 +    locateTabletTest(tab1TabletCache, "r1", tab1e22, "tserver6", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver9", credential);
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver4", credential);
 +    
 +    // simulate all servers failing
 +    deleteServer(tservers, "tserver1");
 +    deleteServer(tservers, "tserver2");
 +    tab1TabletCache.invalidateCache("tserver4");
 +    tab1TabletCache.invalidateCache("tserver6");
 +    tab1TabletCache.invalidateCache("tserver9");
 +    
 +    locateTabletTest(tab1TabletCache, "r1", null, null, credential);
 +    locateTabletTest(tab1TabletCache, "h", null, null, credential);
 +    locateTabletTest(tab1TabletCache, "a", null, null, credential);
 +    
 +    testInstance.setRootTabletLocation("tserver4");
 +    setLocation(tservers, "tserver4", RTE, MTE, "tserver5");
 +    setLocation(tservers, "tserver5", MTE, tab1e1, "tserver1");
 +    setLocation(tservers, "tserver5", MTE, tab1e21, "tserver2");
 +    setLocation(tservers, "tserver5", MTE, tab1e22, "tserver3");
 +    
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver1", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver2", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver3", credential);
 +    
 +    // simulate the !METADATA table splitting
 +    KeyExtent mte1 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), tab1e21.getMetadataEntry(), RTE.getEndRow());
 +    KeyExtent mte2 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, tab1e21.getMetadataEntry());
 +    
 +    setLocation(tservers, "tserver4", RTE, mte1, "tserver5");
 +    setLocation(tservers, "tserver4", RTE, mte2, "tserver6");
 +    deleteServer(tservers, "tserver5");
 +    setLocation(tservers, "tserver5", mte1, tab1e1, "tserver7");
 +    setLocation(tservers, "tserver5", mte1, tab1e21, "tserver8");
 +    setLocation(tservers, "tserver6", mte2, tab1e22, "tserver9");
 +    
 +    tab1TabletCache.invalidateCache(tab1e1);
 +    tab1TabletCache.invalidateCache(tab1e21);
 +    tab1TabletCache.invalidateCache(tab1e22);
 +    
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver7", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +    
 +    // simulate metadata and regular server down and the reassigned
 +    deleteServer(tservers, "tserver5");
 +    tab1TabletCache.invalidateCache("tserver7");
 +    locateTabletTest(tab1TabletCache, "a", null, null, credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +    
 +    setLocation(tservers, "tserver4", RTE, mte1, "tserver10");
 +    setLocation(tservers, "tserver10", mte1, tab1e1, "tserver7");
 +    setLocation(tservers, "tserver10", mte1, tab1e21, "tserver8");
 +    
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver7", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +    tab1TabletCache.invalidateCache("tserver7");
 +    setLocation(tservers, "tserver10", mte1, tab1e1, "tserver2");
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver2", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver8", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +    
 +    // simulate a hole in the !METADATA table, caused by a partial split
 +    KeyExtent mte11 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), tab1e1.getMetadataEntry(), RTE.getEndRow());
 +    KeyExtent mte12 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), tab1e21.getMetadataEntry(), tab1e1.getMetadataEntry());
 +    deleteServer(tservers, "tserver10");
 +    setLocation(tservers, "tserver4", RTE, mte12, "tserver10");
 +    setLocation(tservers, "tserver10", mte12, tab1e21, "tserver12");
 +    
 +    // at this point should be no info in !METADATA about tab1e1
 +    tab1TabletCache.invalidateCache(tab1e1);
 +    tab1TabletCache.invalidateCache(tab1e21);
 +    locateTabletTest(tab1TabletCache, "a", null, null, credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver12", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +    
 +    setLocation(tservers, "tserver4", RTE, mte11, "tserver5");
 +    setLocation(tservers, "tserver5", mte11, tab1e1, "tserver13");
 +    
 +    locateTabletTest(tab1TabletCache, "a", tab1e1, "tserver13", credential);
 +    locateTabletTest(tab1TabletCache, "h", tab1e21, "tserver12", credential);
 +    locateTabletTest(tab1TabletCache, "r", tab1e22, "tserver9", credential);
 +  }
 +  
 +  public void test2() throws Exception {
 +    TServers tservers = new TServers();
 +    TabletLocatorImpl metaCache = createLocators(tservers, "tserver1", "tserver2", "foo");
 +    
 +    KeyExtent ke1 = nke("foo", "m", null);
 +    KeyExtent ke2 = nke("foo", null, "m");
 +    
 +    setLocation(tservers, "tserver2", MTE, ke1, null);
 +    setLocation(tservers, "tserver2", MTE, ke2, "L1");
 +    
 +    locateTabletTest(metaCache, "a", null, null, credential);
 +    locateTabletTest(metaCache, "r", ke2, "L1", credential);
 +    
 +    setLocation(tservers, "tserver2", MTE, ke1, "L2");
 +    
 +    locateTabletTest(metaCache, "a", ke1, "L2", credential);
 +    locateTabletTest(metaCache, "r", ke2, "L1", credential);
 +  }
 +  
 +  public void testBinRanges1() throws Exception {
 +    Text tableName = new Text("foo");
 +    
 +    TabletLocatorImpl metaCache = createLocators("foo", nke("foo", null, null), "l1");
 +    
 +    List<Range> ranges = nrl(nr(null, null));
 +    Map<String,Map<KeyExtent,List<Range>>> expected = createExpectedBinnings("l1", nol(nke("foo", null, null), nrl(nr(null, null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    ranges = nrl(nr("a", null));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", null, null), nrl(nr("a", null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    ranges = nrl(nr(null, "b"));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", null, null), nrl(nr(null, "b")))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +  }
 +  
 +  public void testBinRanges2() throws Exception {
 +    
 +    Text tableName = new Text("foo");
 +    
 +    List<Range> ranges = nrl(nr(null, null));
 +    TabletLocatorImpl metaCache = createLocators("foo", nke("foo", "g", null), "l1", nke("foo", null, "g"), "l2");
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr(null, null))), "l2",
 +        nol(nke("foo", null, "g"), nrl(nr(null, null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +  }
 +  
 +  public void testBinRanges3() throws Exception {
 +    
 +    Text tableName = new Text("foo");
 +    
 +    // test with three tablets and a range that covers the whole table
 +    List<Range> ranges = nrl(nr(null, null));
 +    TabletLocatorImpl metaCache = createLocators("foo", nke("foo", "g", null), "l1", nke("foo", "m", "g"), "l2", nke("foo", null, "m"), "l2");
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr(null, null))), "l2",
 +        nol(nke("foo", "m", "g"), nrl(nr(null, null)), nke("foo", null, "m"), nrl(nr(null, null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    // test with three tablets where one range falls within the first tablet and last two ranges fall within the last tablet
 +    ranges = nrl(nr(null, "c"), nr("s", "y"), nr("z", null));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr(null, "c"))), "l2", nol(nke("foo", null, "m"), nrl(nr("s", "y"), nr("z", null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    // test is same as above, but has an additional range that spans the first two tablets
 +    ranges = nrl(nr(null, "c"), nr("f", "i"), nr("s", "y"), nr("z", null));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr(null, "c"), nr("f", "i"))), "l2",
 +        nol(nke("foo", "m", "g"), nrl(nr("f", "i")), nke("foo", null, "m"), nrl(nr("s", "y"), nr("z", null)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    // test where start of range is not inclusive and same as tablet endrow
 +    ranges = nrl(nr("g", false, "m", true));
 +    expected = createExpectedBinnings("l2", nol(nke("foo", "m", "g"), nrl(nr("g", false, "m", true)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    // test where start of range is inclusive and same as tablet endrow
 +    ranges = nrl(nr("g", true, "m", true));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr("g", true, "m", true))), "l2",
 +        nol(nke("foo", "m", "g"), nrl(nr("g", true, "m", true)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    ranges = nrl(nr("g", true, "m", false));
 +    expected = createExpectedBinnings("l1", nol(nke("foo", "g", null), nrl(nr("g", true, "m", false))), "l2",
 +        nol(nke("foo", "m", "g"), nrl(nr("g", true, "m", false)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    ranges = nrl(nr("g", false, "m", false));
 +    expected = createExpectedBinnings("l2", nol(nke("foo", "m", "g"), nrl(nr("g", false, "m", false)))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +  }
 +  
 +  public void testBinRanges4() throws Exception {
 +    Text tableName = new Text("foo");
 +    
 +    List<Range> ranges = nrl(new Range(new Text("1")));
 +    TabletLocatorImpl metaCache = createLocators("foo", nke("foo", "0", null), "l1", nke("foo", "1", "0"), "l2", nke("foo", "2", "1"), "l3",
 +        nke("foo", "3", "2"), "l4", nke("foo", null, "3"), "l5");
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> expected = createExpectedBinnings("l2", nol(nke("foo", "1", "0"), nrl(new Range(new Text("1"))))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected);
 +    
 +    Key rowColKey = new Key(new Text("3"), new Text("cf1"), new Text("cq1"));
 +    Range range = new Range(rowColKey, true, new Key(new Text("3")).followingKey(PartialKey.ROW), false);
 +    
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected4 = createExpectedBinnings("l4", nol(nke("foo", "3", "2"), nrl(range))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected4, nrl());
 +    
 +    range = new Range(rowColKey, true, new Key(new Text("3")).followingKey(PartialKey.ROW), true);
 +    
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected5 = createExpectedBinnings("l4", nol(nke("foo", "3", "2"), nrl(range)), "l5",
 +        nol(nke("foo", null, "3"), nrl(range))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected5, nrl());
 +    
 +    range = new Range(new Text("2"), false, new Text("3"), false);
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected6 = createExpectedBinnings("l4", nol(nke("foo", "3", "2"), nrl(range))
 +    
 +    );
 +    runTest(tableName, ranges, metaCache, expected6, nrl());
 +    
 +    range = new Range(new Text("2"), true, new Text("3"), false);
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected7 = createExpectedBinnings("l3", nol(nke("foo", "2", "1"), nrl(range)), "l4",
 +        nol(nke("foo", "3", "2"), nrl(range))
 +    
 +    );
 +    runTest(tableName, ranges, metaCache, expected7, nrl());
 +    
 +    range = new Range(new Text("2"), false, new Text("3"), true);
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected8 = createExpectedBinnings("l4", nol(nke("foo", "3", "2"), nrl(range))
 +    
 +    );
 +    runTest(tableName, ranges, metaCache, expected8, nrl());
 +    
 +    range = new Range(new Text("2"), true, new Text("3"), true);
 +    ranges = nrl(range);
 +    Map<String,Map<KeyExtent,List<Range>>> expected9 = createExpectedBinnings("l3", nol(nke("foo", "2", "1"), nrl(range)), "l4",
 +        nol(nke("foo", "3", "2"), nrl(range))
 +    
 +    );
 +    runTest(tableName, ranges, metaCache, expected9, nrl());
 +    
 +  }
 +  
 +  public void testBinRanges5() throws Exception {
 +    // Test binning when there is a hole in the !METADATA information
 +    Text tableName = new Text("foo");
 +    
 +    List<Range> ranges = nrl(new Range(new Text("1")));
 +    TabletLocatorImpl metaCache = createLocators("foo", nke("foo", "0", null), "l1", nke("foo", "1", "0"), "l2", nke("foo", "3", "2"), "l4",
 +        nke("foo", null, "3"), "l5");
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> expected1 = createExpectedBinnings("l2", nol(nke("foo", "1", "0"), nrl(new Range(new Text("1"))))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected1);
 +    
 +    ranges = nrl(new Range(new Text("2")), new Range(new Text("11")));
 +    Map<String,Map<KeyExtent,List<Range>>> expected2 = createExpectedBinnings();
 +    
 +    runTest(tableName, ranges, metaCache, expected2, ranges);
 +    
 +    ranges = nrl(new Range(new Text("1")), new Range(new Text("2")));
 +    
 +    runTest(tableName, ranges, metaCache, expected1, nrl(new Range(new Text("2"))));
 +    
 +    ranges = nrl(nr("0", "2"), nr("3", "4"));
 +    Map<String,Map<KeyExtent,List<Range>>> expected3 = createExpectedBinnings("l4", nol(nke("foo", "3", "2"), nrl(nr("3", "4"))), "l5",
 +        nol(nke("foo", null, "3"), nrl(nr("3", "4")))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected3, nrl(nr("0", "2")));
 +    
 +    ranges = nrl(nr("0", "1"), nr("0", "11"), nr("1", "2"), nr("0", "4"), nr("2", "4"), nr("21", "4"));
 +    Map<String,Map<KeyExtent,List<Range>>> expected4 = createExpectedBinnings("l1", nol(nke("foo", "0", null), nrl(nr("0", "1"))), "l2",
 +        nol(nke("foo", "1", "0"), nrl(nr("0", "1"))), "l4", nol(nke("foo", "3", "2"), nrl(nr("21", "4"))), "l5", nol(nke("foo", null, "3"), nrl(nr("21", "4")))
 +    
 +    );
 +    
 +    runTest(tableName, ranges, metaCache, expected4, nrl(nr("0", "11"), nr("1", "2"), nr("0", "4"), nr("2", "4")));
 +  }
 +  
 +  public void testBinMutations1() throws Exception {
 +    // one tablet table
 +    KeyExtent ke1 = nke("foo", null, null);
 +    TabletLocatorImpl metaCache = createLocators("foo", ke1, "l1");
 +    
 +    List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("c", "cf1:cq1=v3", "cf1:cq2=v4"));
 +    Map<String,Map<KeyExtent,List<String>>> emb = cemb(nol("a", "l1", ke1), nol("c", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"));
 +    emb = cemb(nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("a", "cf1:cq3=v3"));
 +    emb = cemb(nol("a", "l1", ke1), nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +  }
 +  
 +  public void testBinMutations2() throws Exception {
 +    // no tablets for table
 +    TabletLocatorImpl metaCache = createLocators("foo");
 +    
 +    List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("c", "cf1:cq1=v3", "cf1:cq2=v4"));
 +    Map<String,Map<KeyExtent,List<String>>> emb = cemb();
 +    runTest(metaCache, ml, emb, "a", "c");
 +  }
 +  
 +  public void testBinMutations3() throws Exception {
 +    // three tablet table
 +    KeyExtent ke1 = nke("foo", "h", null);
 +    KeyExtent ke2 = nke("foo", "t", "h");
 +    KeyExtent ke3 = nke("foo", null, "t");
 +    
 +    TabletLocatorImpl metaCache = createLocators("foo", ke1, "l1", ke2, "l2", ke3, "l3");
 +    
 +    List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("i", "cf1:cq1=v3", "cf1:cq2=v4"));
 +    Map<String,Map<KeyExtent,List<String>>> emb = cemb(nol("a", "l1", ke1), nol("i", "l2", ke2));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"));
 +    emb = cemb(nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("a", "cf1:cq3=v3"));
 +    emb = cemb(nol("a", "l1", ke1), nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3"));
 +    emb = cemb(nol("a", "l1", ke1), nol("w", "l3", ke3));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3"), nm("z", "cf1:cq4=v4"));
 +    emb = cemb(nol("a", "l1", ke1), nol("w", "l3", ke3), nol("z", "l3", ke3));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("h", "cf1:cq1=v1", "cf1:cq2=v2"), nm("t", "cf1:cq1=v1", "cf1:cq2=v2"));
 +    emb = cemb(nol("h", "l1", ke1), nol("t", "l2", ke2));
 +    runTest(metaCache, ml, emb);
 +  }
 +  
 +  public void testBinMutations4() throws Exception {
 +    // three table with hole
 +    KeyExtent ke1 = nke("foo", "h", null);
 +    
 +    KeyExtent ke3 = nke("foo", null, "t");
 +    
 +    TabletLocatorImpl metaCache = createLocators("foo", ke1, "l1", ke3, "l3");
 +    
 +    List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("i", "cf1:cq1=v3", "cf1:cq2=v4"));
 +    Map<String,Map<KeyExtent,List<String>>> emb = cemb(nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb, "i");
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"));
 +    emb = cemb(nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("a", "cf1:cq3=v3"));
 +    emb = cemb(nol("a", "l1", ke1), nol("a", "l1", ke1));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3"));
 +    emb = cemb(nol("a", "l1", ke1), nol("w", "l3", ke3));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3"), nm("z", "cf1:cq4=v4"));
 +    emb = cemb(nol("a", "l1", ke1), nol("w", "l3", ke3), nol("z", "l3", ke3));
 +    runTest(metaCache, ml, emb);
 +    
 +    ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("w", "cf1:cq3=v3"), nm("z", "cf1:cq4=v4"), nm("t", "cf1:cq5=v5"));
 +    emb = cemb(nol("a", "l1", ke1), nol("w", "l3", ke3), nol("z", "l3", ke3));
 +    runTest(metaCache, ml, emb, "t");
 +  }
 +  
 +  public void testBinSplit() throws Exception {
 +    // try binning mutations and ranges when a tablet splits
 +    
 +    for (int i = 0; i < 3; i++) {
 +      // when i == 0 only test binning mutations
 +      // when i == 1 only test binning ranges
 +      // when i == 2 test both
 +      
 +      KeyExtent ke1 = nke("foo", null, null);
 +      TServers tservers = new TServers();
 +      TabletLocatorImpl metaCache = createLocators(tservers, "tserver1", "tserver2", "foo", ke1, "l1");
 +      
 +      List<Mutation> ml = nml(nm("a", "cf1:cq1=v1", "cf1:cq2=v2"), nm("m", "cf1:cq1=v3", "cf1:cq2=v4"), nm("z", "cf1:cq1=v5"));
 +      Map<String,Map<KeyExtent,List<String>>> emb = cemb(nol("a", "l1", ke1), nol("m", "l1", ke1), nol("z", "l1", ke1));
 +      if (i == 0 || i == 2)
 +        runTest(metaCache, ml, emb);
 +      
 +      List<Range> ranges = nrl(new Range(new Text("a")), new Range(new Text("m")), new Range(new Text("z")));
 +      
 +      Map<String,Map<KeyExtent,List<Range>>> expected1 = createExpectedBinnings("l1", nol(nke("foo", null, null), ranges)
 +      
 +      );
 +      
 +      if (i == 1 || i == 2)
 +        runTest(new Text("foo"), ranges, metaCache, expected1);
 +      
 +      KeyExtent ke11 = nke("foo", "n", null);
 +      KeyExtent ke12 = nke("foo", null, "n");
 +      
 +      setLocation(tservers, "tserver2", MTE, ke12, "l2");
 +      
 +      metaCache.invalidateCache(ke1);
 +      
 +      emb = cemb(nol("z", "l2", ke12));
 +      if (i == 0 || i == 2)
 +        runTest(metaCache, ml, emb, "a", "m");
 +      
 +      Map<String,Map<KeyExtent,List<Range>>> expected2 = createExpectedBinnings("l2", nol(nke("foo", null, "n"), nrl(new Range(new Text("z"))))
 +      
 +      );
 +      
 +      if (i == 1 || i == 2)
 +        runTest(new Text("foo"), ranges, metaCache, expected2, nrl(new Range(new Text("a")), new Range(new Text("m"))));
 +      
 +      setLocation(tservers, "tserver2", MTE, ke11, "l3");
 +      emb = cemb(nol("a", "l3", ke11), nol("m", "l3", ke11), nol("z", "l2", ke12));
 +      if (i == 0 || i == 2)
 +        runTest(metaCache, ml, emb);
 +      
 +      Map<String,Map<KeyExtent,List<Range>>> expected3 = createExpectedBinnings("l2", nol(nke("foo", null, "n"), nrl(new Range(new Text("z")))), "l3",
 +          nol(nke("foo", "n", null), nrl(new Range(new Text("a")), new Range(new Text("m"))))
 +      
 +      );
 +      
 +      if (i == 1 || i == 2)
 +        runTest(new Text("foo"), ranges, metaCache, expected3);
 +    }
 +  }
 +  
 +  public void testBug1() throws Exception {
 +    // a bug that occurred while running continuous ingest
 +    KeyExtent mte1 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("0;0bc"), RTE.getEndRow());
 +    KeyExtent mte2 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, new Text("0;0bc"));
 +    
 +    TServers tservers = new TServers();
 +    TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers);
 +    TestInstance testInstance = new TestInstance("instance1", "tserver1");
 +    
 +    RootTabletLocator rtl = new RootTabletLocator(testInstance);
 +    TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(Constants.METADATA_TABLE_ID), rtl, ttlo);
 +    TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo);
 +    
 +    setLocation(tservers, "tserver1", RTE, mte1, "tserver2");
 +    setLocation(tservers, "tserver1", RTE, mte2, "tserver3");
 +    
 +    // create two tablets that straddle a !METADATA split point
 +    KeyExtent ke1 = new KeyExtent(new Text("0"), new Text("0bbf20e"), null);
 +    KeyExtent ke2 = new KeyExtent(new Text("0"), new Text("0bc0756"), new Text("0bbf20e"));
 +    
 +    setLocation(tservers, "tserver2", mte1, ke1, "tserver4");
 +    setLocation(tservers, "tserver3", mte2, ke2, "tserver5");
 +    
 +    // look up something that comes after the last entry in mte1
 +    locateTabletTest(tab0TabletCache, "0bbff", ke2, "tserver5", credential);
 +  }
 +  
 +  public void testBug2() throws Exception {
 +    // a bug that occurred while running a functional test
 +    KeyExtent mte1 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("~"), RTE.getEndRow());
 +    KeyExtent mte2 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, new Text("~"));
 +    
 +    TServers tservers = new TServers();
 +    TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers);
 +    TestInstance testInstance = new TestInstance("instance1", "tserver1");
 +    
 +    RootTabletLocator rtl = new RootTabletLocator(testInstance);
 +    TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(Constants.METADATA_TABLE_ID), rtl, ttlo);
 +    TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("0"), rootTabletCache, ttlo);
 +    
 +    setLocation(tservers, "tserver1", RTE, mte1, "tserver2");
 +    setLocation(tservers, "tserver1", RTE, mte2, "tserver3");
 +    
 +    // create the ~ tablet so it exists
 +    Map<KeyExtent,SortedMap<Key,Value>> ts3 = new HashMap<KeyExtent,SortedMap<Key,Value>>();
 +    ts3.put(mte2, new TreeMap<Key,Value>());
 +    tservers.tservers.put("tserver3", ts3);
 +    
 +    assertNull(tab0TabletCache.locateTablet(new Text("row_0000000000"), false, false, credential));
 +    
 +  }
 +  
 +  // this test reproduces a problem where empty metadata tablets, that were created by user tablets being merged away, caused locating tablets to fail
 +  public void testBug3() throws Exception {
 +    KeyExtent mte1 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("1;c"), RTE.getEndRow());
 +    KeyExtent mte2 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("1;f"), new Text("1;c"));
 +    KeyExtent mte3 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("1;j"), new Text("1;f"));
 +    KeyExtent mte4 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), new Text("1;r"), new Text("1;j"));
 +    KeyExtent mte5 = new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, new Text("1;r"));
 +    
 +    KeyExtent ke1 = new KeyExtent(new Text("1"), null, null);
 +    
 +    TServers tservers = new TServers();
 +    TestTabletLocationObtainer ttlo = new TestTabletLocationObtainer(tservers);
 +    TestInstance testInstance = new TestInstance("instance1", "tserver1");
 +    
 +    RootTabletLocator rtl = new RootTabletLocator(testInstance);
 +    
 +    TabletLocatorImpl rootTabletCache = new TabletLocatorImpl(new Text(Constants.METADATA_TABLE_ID), rtl, ttlo);
 +    TabletLocatorImpl tab0TabletCache = new TabletLocatorImpl(new Text("1"), rootTabletCache, ttlo);
 +    
 +    setLocation(tservers, "tserver1", RTE, mte1, "tserver2");
 +    setLocation(tservers, "tserver1", RTE, mte2, "tserver3");
 +    setLocation(tservers, "tserver1", RTE, mte3, "tserver4");
 +    setLocation(tservers, "tserver1", RTE, mte4, "tserver5");
 +    setLocation(tservers, "tserver1", RTE, mte5, "tserver6");
 +    
 +    createEmptyTablet(tservers, "tserver2", mte1);
 +    createEmptyTablet(tservers, "tserver3", mte2);
 +    createEmptyTablet(tservers, "tserver4", mte3);
 +    createEmptyTablet(tservers, "tserver5", mte4);
 +    setLocation(tservers, "tserver6", mte5, ke1, "tserver7");
 +    
 +    locateTabletTest(tab0TabletCache, "a", ke1, "tserver7", credential);
 +    
 +  }
 +  
 +  public void testAccumulo1248() throws Exception {
 +    TServers tservers = new TServers();
 +    TabletLocatorImpl metaCache = createLocators(tservers, "tserver1", "tserver2", "foo");
 +    
 +    KeyExtent ke1 = nke("foo", null, null);
 +    
 +    // set two locations for a tablet, this is not supposed to happen. The metadata cache should throw an exception if it sees this rather than caching one of
 +    // the locations.
 +    setLocation(tservers, "tserver2", MTE, ke1, "L1", "I1");
 +    setLocation(tservers, "tserver2", MTE, ke1, "L2", "I2");
 +    
 +    try {
 +      metaCache.locateTablet(new Text("a"), false, false, credential);
 +      assertTrue(false);
 +    } catch (Exception e) {
 +      
 +    }
 +
 +
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
----------------------------------------------------------------------
diff --cc examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
index 886c4ba,0000000..d78ae0e
mode 100644,000000..100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
@@@ -1,169 -1,0 +1,171 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.examples.simple.client;
 +
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ClientOnRequiredTable;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.security.SecurityErrorCode;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * Simple example for writing random data to Accumulo. See docs/examples/README.batch for instructions.
 + * 
 + * The rows of the entries will be randomly generated numbers between a specified min and max (prefixed by "row_"). The column families will be "foo" and column
 + * qualifiers will be "1". The values will be random byte arrays of a specified size.
 + */
 +public class RandomBatchWriter {
 +  
 +  /**
 +   * Creates a random byte array of specified size using the specified seed.
 +   * 
 +   * @param rowid
 +   *          the seed to use for the random number generator
 +   * @param dataSize
 +   *          the size of the array
 +   * @return a random byte array
 +   */
 +  public static byte[] createValue(long rowid, int dataSize) {
 +    Random r = new Random(rowid);
 +    byte value[] = new byte[dataSize];
 +    
 +    r.nextBytes(value);
 +    
 +    // transform to printable chars
 +    for (int j = 0; j < value.length; j++) {
 +      value[j] = (byte) (((0xff & value[j]) % 92) + ' ');
 +    }
 +    
 +    return value;
 +  }
 +  
 +  /**
 +   * Creates a mutation on a specified row with column family "foo", column qualifier "1", specified visibility, and a random value of specified size.
 +   * 
 +   * @param rowid
 +   *          the row of the mutation
 +   * @param dataSize
 +   *          the size of the random value
 +   * @param visibility
 +   *          the visibility of the entry to insert
 +   * @return a mutation
 +   */
 +  public static Mutation createMutation(long rowid, int dataSize, ColumnVisibility visibility) {
 +    Text row = new Text(String.format("row_%010d", rowid));
 +    
 +    Mutation m = new Mutation(row);
 +    
 +    // create a random value that is a function of the
 +    // row id for verification purposes
 +    byte value[] = createValue(rowid, dataSize);
 +    
 +    m.put(new Text("foo"), new Text("1"), visibility, new Value(value));
 +    
 +    return m;
 +  }
 +  
 +  static class Opts extends ClientOnRequiredTable {
 +    @Parameter(names="--num", required=true)
 +    int num = 0;
 +    @Parameter(names="--min")
 +    long min = 0;
 +    @Parameter(names="--max")
 +    long max = Long.MAX_VALUE;
 +    @Parameter(names="--size", required=true, description="size of the value to write")
 +    int size = 0;
 +    @Parameter(names="--vis", converter=VisibilityConverter.class)
 +    ColumnVisibility visiblity = new ColumnVisibility("");
 +    @Parameter(names="--seed", description="seed for pseudo-random number generator")
 +    Long seed = null;
 +  }
 + 
 +  /**
 +   * Writes a specified number of entries to Accumulo using a {@link BatchWriter}.
 +   * 
 +   * @throws AccumuloException
 +   * @throws AccumuloSecurityException
 +   * @throws TableNotFoundException
 +   */
 +  public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    Opts opts = new Opts();
 +    BatchWriterOpts bwOpts = new BatchWriterOpts();
 +    opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
++
++    if ((opts.max - opts.min) < opts.num) {
++      System.err.println(String.format("You must specify a min and a max that allow for at least num possible values. For example, you requested %d rows, but a min of %d and a max of %d only allows for %d rows.", opts.num, opts.min, opts.max, (opts.max - opts.min)));
++      System.exit(1);
++    }
 +    
 +    Random r;
 +    if (opts.seed == null)
 +      r = new Random();
 +    else {
 +      r = new Random(opts.seed);
 +    }
 +    
 +    Connector connector = opts.getConnector();
 +    BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig());
 +    
 +    // reuse the ColumnVisibility object to improve performance
 +    ColumnVisibility cv = opts.visiblity;
 +    
-     for (int i = 0; i < opts.num; i++) {
-       
++    for (int i = 0; i < opts.num; i++) {  
 +      long rowid = (Math.abs(r.nextLong()) % (opts.max - opts.min)) + opts.min;
-       
 +      Mutation m = createMutation(rowid, opts.size, cv);
-       
 +      bw.addMutation(m);
-       
 +    }
 +    
 +    try {
 +      bw.close();
 +    } catch (MutationsRejectedException e) {
 +      if (e.getAuthorizationFailuresMap().size() > 0) {
 +        HashMap<String,Set<SecurityErrorCode>> tables = new HashMap<String,Set<SecurityErrorCode>>();
 +        for (Entry<KeyExtent,Set<SecurityErrorCode>> ke : e.getAuthorizationFailuresMap().entrySet()) {
 +          Set<SecurityErrorCode> secCodes = tables.get(ke.getKey().getTableId().toString());
 +          if (secCodes == null) {
 +            secCodes = new HashSet<SecurityErrorCode>();
 +            tables.put(ke.getKey().getTableId().toString(), secCodes);
 +          }
 +          secCodes.addAll(ke.getValue());
 +        }
 +        System.err.println("ERROR : Not authorized to write to tables : " + tables);
 +      }
 +      
 +      if (e.getConstraintViolationSummaries().size() > 0) {
 +        System.err.println("ERROR : Constraint violations occurred : " + e.getConstraintViolationSummaries());
 +      }
++      System.exit(1);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index aa24552,0000000..420533a
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@@ -1,310 -1,0 +1,317 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.util.Collections;
 +import java.util.ConcurrentModificationException;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.Code;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.data.Stat;
 +
 +/**
 + * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
 + * 
 + */
 +public class ZooCache {
 +  private static final Logger log = Logger.getLogger(ZooCache.class);
 +  
 +  private ZCacheWatcher watcher = new ZCacheWatcher();
 +  private Watcher externalWatcher = null;
 +  
 +  private HashMap<String,byte[]> cache;
 +  private HashMap<String,Stat> statCache;
 +  private HashMap<String,List<String>> childrenCache;
 +  
 +  private ZooReader zReader;
 +  
 +  private ZooKeeper getZooKeeper() {
 +    return zReader.getZooKeeper();
 +  }
 +  
 +  private class ZCacheWatcher implements Watcher {
 +    @Override
 +    public void process(WatchedEvent event) {
 +      
 +      if (log.isTraceEnabled())
 +        log.trace(event);
 +      
 +      switch (event.getType()) {
 +        case NodeDataChanged:
 +        case NodeChildrenChanged:
 +        case NodeCreated:
 +        case NodeDeleted:
 +          remove(event.getPath());
 +          break;
 +        case None:
 +          switch (event.getState()) {
 +            case Disconnected:
 +              if (log.isTraceEnabled())
 +                log.trace("Zoo keeper connection disconnected, clearing cache");
 +              clear();
 +              break;
 +            case SyncConnected:
 +              break;
 +            case Expired:
 +              if (log.isTraceEnabled())
 +                log.trace("Zoo keeper connection expired, clearing cache");
 +              clear();
 +              break;
 +            default:
 +              log.warn("Unhandled: " + event);
 +          }
 +          break;
 +        default:
 +          log.warn("Unhandled: " + event);
 +      }
 +      
 +      if (externalWatcher != null) {
 +        externalWatcher.process(event);
 +      }
 +    }
 +  }
 +  
 +  public ZooCache(String zooKeepers, int sessionTimeout) {
 +    this(zooKeepers, sessionTimeout, null);
 +  }
 +  
 +  public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
 +    this(new ZooReader(zooKeepers, sessionTimeout), watcher);
 +  }
 +  
 +  public ZooCache(ZooReader reader, Watcher watcher) {
 +    this.zReader = reader;
 +    this.cache = new HashMap<String,byte[]>();
 +    this.statCache = new HashMap<String,Stat>();
 +    this.childrenCache = new HashMap<String,List<String>>();
 +    this.externalWatcher = watcher;
 +  }
 +  
 +  private static interface ZooRunnable {
 +    void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
 +  }
 +  
 +  private synchronized void retry(ZooRunnable op) {
 +    
 +    int sleepTime = 100;
 +    
 +    while (true) {
 +      
 +      ZooKeeper zooKeeper = getZooKeeper();
 +      
 +      try {
 +        op.run(zooKeeper);
 +        return;
 +        
 +      } catch (KeeperException e) {
 +        if (e.code() == Code.NONODE) {
 +          log.error("Looked up non existant node in cache " + e.getPath(), e);
 +        }
 +        log.warn("Zookeeper error, will retry", e);
 +      } catch (InterruptedException e) {
 +        log.info("Zookeeper error, will retry", e);
 +      } catch (ConcurrentModificationException e) {
 +        log.debug("Zookeeper was modified, will retry");
 +      }
 +      
 +      try {
 +        // do not hold lock while sleeping
 +        wait(sleepTime);
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +      }
 +      if (sleepTime < 10000)
 +        sleepTime = (int) (sleepTime + sleepTime * Math.random());
 +      
 +    }
 +  }
 +  
 +  public synchronized List<String> getChildren(final String zPath) {
 +    
 +    ZooRunnable zr = new ZooRunnable() {
 +      
 +      @Override
 +      public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
 +        
 +        if (childrenCache.containsKey(zPath))
 +          return;
 +        
 +        try {
 +          List<String> children = zooKeeper.getChildren(zPath, watcher);
 +          childrenCache.put(zPath, children);
 +        } catch (KeeperException ke) {
 +          if (ke.code() != Code.NONODE) {
 +            throw ke;
 +          }
 +        }
 +      }
 +      
 +    };
 +    
 +    retry(zr);
 +    
 +    List<String> children = childrenCache.get(zPath);
 +    if (children == null) {
 +      return null;
 +    }
 +    return Collections.unmodifiableList(children);
 +  }
 +  
 +  public synchronized byte[] get(final String zPath) {
 +    return get(zPath, null);
 +  }
 +  
 +  public synchronized byte[] get(final String zPath, Stat stat) {
 +    ZooRunnable zr = new ZooRunnable() {
 +      
 +      @Override
 +      public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
 +        
 +        if (cache.containsKey(zPath))
 +          return;
 +        
 +        /*
 +         * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
 +         * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
 +         * 
 +         * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
 +         * non-existance can not be cached.
 +         */
 +        
 +        Stat stat = zooKeeper.exists(zPath, watcher);
 +        
 +        byte[] data = null;
 +        
 +        if (stat == null) {
 +          if (log.isTraceEnabled())
 +            log.trace("zookeeper did not contain " + zPath);
 +        } else {
 +          try {
 +            data = zooKeeper.getData(zPath, watcher, stat);
 +          } catch (KeeperException.BadVersionException e1) {
 +            throw new ConcurrentModificationException();
 +          } catch (KeeperException.NoNodeException e2) {
 +            throw new ConcurrentModificationException();
 +          }
 +          if (log.isTraceEnabled())
 +            log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data)));
 +        }
 +        if (log.isTraceEnabled())
 +          log.trace("putting " + zPath + " " + (data == null ? null : new String(data)) + " in cache");
 +        put(zPath, data, stat);
 +      }
 +      
 +    };
 +    
 +    retry(zr);
 +    
 +    if (stat != null) {
 +      Stat cstat = statCache.get(zPath);
 +      if (cstat != null) {
 +        try {
 +          ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +          DataOutputStream dos = new DataOutputStream(baos);
 +          cstat.write(dos);
 +          dos.close();
 +          
 +          ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +          DataInputStream dis = new DataInputStream(bais);
 +          stat.readFields(dis);
 +          
 +          dis.close();
 +        } catch (IOException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +    
 +    return cache.get(zPath);
 +  }
 +  
 +  private synchronized void put(String zPath, byte[] data, Stat stat) {
 +    cache.put(zPath, data);
 +    statCache.put(zPath, stat);
 +  }
 +  
 +  private synchronized void remove(String zPath) {
 +    if (log.isTraceEnabled())
 +      log.trace("removing " + zPath + " from cache");
 +    cache.remove(zPath);
 +    childrenCache.remove(zPath);
 +    statCache.remove(zPath);
 +  }
 +  
 +  public synchronized void clear() {
 +    cache.clear();
 +    childrenCache.clear();
 +    statCache.clear();
 +  }
 +  
 +  public synchronized void clear(String zPath) {
 +    
 +    for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
 +      String path = i.next();
 +      if (path.startsWith(zPath))
 +        i.remove();
 +    }
 +    
 +    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
 +      String path = i.next();
 +      if (path.startsWith(zPath))
 +        i.remove();
 +    }
 +    
 +    for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
 +      String path = i.next();
 +      if (path.startsWith(zPath))
 +        i.remove();
 +    }
 +  }
 +  
 +  private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>();
 +  
 +  public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) {
 +    String key = zooKeepers + ":" + sessionTimeout;
 +    ZooCache zc = instances.get(key);
 +    if (zc == null) {
 +      zc = new ZooCache(zooKeepers, sessionTimeout);
 +      instances.put(key, zc);
 +    }
 +    
 +    return zc;
 +  }
++  
++  public void close() throws InterruptedException {
++    cache.clear();
++    statCache.clear();
++    childrenCache.clear();
++    zReader.close();
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index ab73012,0000000..e11f570
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@@ -1,105 -1,0 +1,109 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.AsyncCallback.VoidCallback;
 +import org.apache.zookeeper.KeeperException.Code;
 +import org.apache.zookeeper.data.Stat;
 +
 +public class ZooReader implements IZooReader {
 +  
 +  protected String keepers;
 +  protected int timeout;
 +  
 +  protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) {
 +    return ZooSession.getSession(keepers, timeout, scheme, auth);
 +  }
 +  
 +  protected ZooKeeper getZooKeeper() {
 +    return getSession(keepers, timeout, null, null);
 +  }
 +  
 +  @Override
 +  public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
 +    return getZooKeeper().getData(zPath, false, stat);
 +  }
 +  
 +  @Override
 +  public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
 +    return getZooKeeper().exists(zPath, false);
 +  }
 +  
 +  @Override
 +  public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
 +    return getZooKeeper().exists(zPath, watcher);
 +  }
 +  
 +  @Override
 +  public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
 +    return getZooKeeper().getChildren(zPath, false);
 +  }
 +  
 +  @Override
 +  public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
 +    return getZooKeeper().getChildren(zPath, watcher);
 +  }
 +  
 +  @Override
 +  public boolean exists(String zPath) throws KeeperException, InterruptedException {
 +    return getZooKeeper().exists(zPath, false) != null;
 +  }
 +  
 +  @Override
 +  public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
 +    return getZooKeeper().exists(zPath, watcher) != null;
 +  }
 +  
 +  @Override
 +  public void sync(final String path) throws KeeperException, InterruptedException {
 +    final AtomicInteger rc = new AtomicInteger();
 +    final AtomicBoolean waiter = new AtomicBoolean(false);
 +    getZooKeeper().sync(path, new VoidCallback() {
 +      @Override
 +      public void processResult(int code, String arg1, Object arg2) {
 +        rc.set(code);
 +        synchronized (waiter) {
 +          waiter.set(true);
 +          waiter.notifyAll();
 +        }
 +      }}, null);
 +    synchronized (waiter) {
 +      while (!waiter.get())
 +        waiter.wait();
 +    }
 +    Code code = Code.get(rc.get());
 +    if (code != KeeperException.Code.OK) {
 +      throw KeeperException.create(code);
 +    }
 +  }  
 +  
 +  public ZooReader(String keepers, int timeout) {
 +    this.keepers = keepers;
 +    this.timeout = timeout;
 +  }
++
++  public void close() throws InterruptedException {
++    getZooKeeper().close();
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
----------------------------------------------------------------------
diff --cc minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
index a579397,0000000..a1f58f6
mode 100644,000000..100644
--- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
+++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
@@@ -1,129 -1,0 +1,150 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.minicluster;
 +
 +import java.io.File;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.util.PortUtils;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.io.filefilter.SuffixFileFilter;
 +import org.apache.commons.io.filefilter.TrueFileFilter;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
- import org.junit.AfterClass;
 +import org.junit.Assert;
- import org.junit.BeforeClass;
++import org.junit.Ignore;
 +import org.junit.Test;
 +import org.junit.rules.TemporaryFolder;
 +
 +import com.google.common.collect.ImmutableMap;
++import com.google.common.io.Files;
 +
 +/**
 + * 
 + */
 +public class MiniAccumuloClusterGCTest {
 +  
++  @Test
++  public void testGcConfig() throws Exception {
++    File f = Files.createTempDir();
++    f.deleteOnExit();
++    try {
++      MiniAccumuloConfig macConfig = new MiniAccumuloConfig(f, passwd);
++      macConfig.setNumTservers(1);
++  
++      Assert.assertEquals(false, macConfig.shouldRunGC());
++      
++      // Turn on the garbage collector
++      macConfig.runGC(true);
++  
++      Assert.assertEquals(true, macConfig.shouldRunGC());
++    } finally {
++      if (null != f && f.exists()) {
++        f.delete();
++      }
++    }
++  }
++
++  
 +  private static TemporaryFolder tmpDir = new TemporaryFolder();
 +  private static MiniAccumuloConfig macConfig;
 +  private static MiniAccumuloCluster accumulo;
 +  private static final String passwd = "password";
 +  
-   @BeforeClass
 +  public static void setupMiniCluster() throws Exception {
 +    tmpDir.create();
 +    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
 +    
 +    macConfig = new MiniAccumuloConfig(tmpDir.getRoot(), passwd);
 +    macConfig.setNumTservers(1);
 +    
 +    // Turn on the garbage collector
 +    macConfig.runGC(true);
 +    
 +    String gcPort = Integer.toString(PortUtils.getRandomFreePort());
 +    
 +    // And tweak the settings to make it run often
 +    Map<String,String> config = ImmutableMap.of(Property.GC_CYCLE_DELAY.getKey(), "1s", Property.GC_CYCLE_START.getKey(), "0s", Property.GC_PORT.getKey(), gcPort);
 +    macConfig.setSiteConfig(config);
 +    
 +    accumulo = new MiniAccumuloCluster(macConfig);
 +    accumulo.start();
 +  }
 +  
-   @AfterClass
 +  public static void tearDownMiniCluster() throws Exception {
 +    accumulo.stop();
 +    tmpDir.delete();
 +  }
 +  
-   @Test(timeout = 20000)
++  // This test seems to be a little too unstable for a unit test
++  @Ignore
 +  public void test() throws Exception {
 +    ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
 +    Connector c = inst.getConnector("root", new PasswordToken(passwd));
 +    
 +    final String table = "foobar";
 +    c.tableOperations().create(table);
 +    
 +    BatchWriter bw = null;
 +    
 +    // Add some data
 +    try {
 +      bw = c.createBatchWriter(table, new BatchWriterConfig());
 +      Mutation m = new Mutation("a");
 +      for (int i = 0; i < 50; i++) {
 +        m.put("colf", Integer.toString(i), "");
 +      }
 +      
 +      bw.addMutation(m);
 +    } finally {
 +      if (null != bw) {
 +        bw.close();
 +      }
 +    }
 +    
 +    final boolean flush = true, wait = true;
 +    
 +    // Compact the tables to get some rfiles which we can gc
 +    c.tableOperations().compact(table, null, null, flush, wait);
 +    c.tableOperations().compact("!METADATA", null, null, flush, wait);
 +    
 +    File accumuloDir = new File(tmpDir.getRoot().getAbsolutePath(), "accumulo");
 +    File tables = new File(accumuloDir.getAbsolutePath(), "tables");
 +    
 +    int fileCountAfterCompaction = FileUtils.listFiles(tables, new SuffixFileFilter(".rf"), TrueFileFilter.TRUE).size();
 +    
 +    // Sleep for 4s to let the GC do its thing
 +    for (int i = 1; i < 5; i++) {
 +      Thread.sleep(1000);
 +      int fileCountAfterGCWait = FileUtils.listFiles(tables, new SuffixFileFilter(".rf"), TrueFileFilter.TRUE).size();
 +
 +      if (fileCountAfterGCWait < fileCountAfterCompaction) {
 +        return;
 +      }
 +    }
 +    
 +    Assert.fail("Expected to find less files after compaction and pause for GC");
 +  }
 +  
 +}


[5/9] git commit: Merge branch '1.5.1-SNAPSHOT' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.5.1-SNAPSHOT

Posted by uj...@apache.org.
Merge branch '1.5.1-SNAPSHOT' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.5.1-SNAPSHOT

Conflicts:
	examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/036f381f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/036f381f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/036f381f

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 036f381fb263f6165a9cf62aa4f67d7dbf34c099
Parents: 04f81b5 6b87c87
Author: Bill Slacum <uj...@apache.org>
Authored: Mon Nov 18 14:38:26 2013 -0500
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 14:38:26 2013 -0500

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../client/impl/MultiTableBatchWriterImpl.java  | 166 ++++--
 .../accumulo/core/client/impl/Tables.java       |   7 +
 .../simple/client/RandomBatchWriter.java        |   4 -
 .../test/MultiTableBatchWriterTest.java         | 539 +++++++++++++++++++
 5 files changed, 683 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/036f381f/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
----------------------------------------------------------------------
diff --cc examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
index d78ae0e,f9acfd9..a640003
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
@@@ -121,12 -121,7 +121,8 @@@ public class RandomBatchWriter 
      Opts opts = new Opts();
      BatchWriterOpts bwOpts = new BatchWriterOpts();
      opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
- 
 -    
 +    if ((opts.max - opts.min) < opts.num) {
 +      System.err.println(String.format("You must specify a min and a max that allow for at least num possible values. For example, you requested %d rows, but a min of %d and a max of %d only allows for %d rows.", opts.num, opts.min, opts.max, (opts.max - opts.min)));
-       System.exit(1);
-     }
-     
      Random r;
      if (opts.seed == null)
        r = new Random();


[4/9] git commit: Merge remote-tracking branch 'apache-committers/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Posted by uj...@apache.org.
Merge remote-tracking branch 'apache-committers/1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
	core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
	examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
	fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
	server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/04f81b50
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/04f81b50
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/04f81b50

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 04f81b50d65b1d359cef2d38d5a6793dd6b4065f
Parents: ac20fe0 79d686f
Author: Bill Slacum <uj...@apache.org>
Authored: Mon Nov 18 14:35:38 2013 -0500
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 14:35:38 2013 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   |   7 ++
 .../accumulo/core/client/ZooKeeperInstance.java | 113 +++++++++++++------
 .../core/client/impl/ThriftTransportPool.java   |  16 ++-
 .../accumulo/core/client/mock/MockInstance.java |   5 +
 .../apache/accumulo/core/util/ThriftUtil.java   |   4 +
 .../core/client/impl/TabletLocatorImplTest.java |   5 +
 .../simple/client/RandomBatchWriter.java        |  12 +-
 .../accumulo/fate/zookeeper/ZooCache.java       |   7 ++
 .../accumulo/fate/zookeeper/ZooReader.java      |   4 +
 .../minicluster/MiniAccumuloClusterGCTest.java  |  31 ++++-
 .../accumulo/server/client/HdfsZooInstance.java |   9 ++
 11 files changed, 164 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/Instance.java
index 3b04281,0000000..612301e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@@ -1,166 -1,0 +1,173 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client;
 +
 +import java.nio.ByteBuffer;
 +import java.util.List;
 +
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +
 +/**
 + * This class represents the information a client needs to know to connect to an instance of accumulo.
 + * 
 + */
 +public interface Instance {
 +  /**
 +   * Returns the location of the tablet server that is serving the root tablet.
 +   * 
 +   * @return location in "hostname:port" form
 +   */
 +  public abstract String getRootTabletLocation();
 +  
 +  /**
 +   * Returns the location(s) of the accumulo master and any redundant servers.
 +   * 
 +   * @return a list of locations in "hostname:port" form
 +   */
 +  public abstract List<String> getMasterLocations();
 +  
 +  /**
 +   * Returns a unique string that identifies this instance of accumulo.
 +   * 
 +   * @return a UUID
 +   */
 +  public abstract String getInstanceID();
 +  
 +  /**
 +   * Returns the instance name given at system initialization time.
 +   * 
 +   * @return current instance name
 +   */
 +  public abstract String getInstanceName();
 +  
 +  /**
 +   * Returns a comma-separated list of zookeeper servers the instance is using.
 +   * 
 +   * @return the zookeeper servers this instance is using in "hostname:port" form
 +   */
 +  public abstract String getZooKeepers();
 +  
 +  /**
 +   * Returns the zookeeper connection timeout.
 +   * 
 +   * @return the configured timeout to connect to zookeeper
 +   */
 +  public abstract int getZooKeepersSessionTimeOut();
 +  
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          A UTF-8 encoded password. The password may be cleared after making this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException;
 +  
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param auth
 +   *          An Credentials object.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException;
 +  
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          A UTF-8 encoded password. The password may be cleared after making this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException;
 +  
 +  /**
 +   * Returns a connection to this instance of accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          If a mutable CharSequence is passed in, it may be cleared after this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
++
++  /**
++   * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
++   * stored which will enhance performance.
++   * @throws AccumuloException 
++   */
++  public abstract void close() throws AccumuloException;
 +  
 +  /**
 +   * Returns the AccumuloConfiguration to use when interacting with this instance.
 +   * 
 +   * @return the AccumuloConfiguration that specifies properties related to interacting with this instance
 +   */
 +  public abstract AccumuloConfiguration getConfiguration();
 +  
 +  /**
 +   * Set the AccumuloConfiguration to use when interacting with this instance.
 +   * 
 +   * @param conf
 +   *          accumulo configuration
 +   */
 +  public abstract void setConfiguration(AccumuloConfiguration conf);
 +  
 +  /**
 +   * Returns a connection to this instance of accumulo.
 +   * 
 +   * @param principal
 +   *          a valid accumulo user
 +   * @param token
 +   *          Use the token type configured for the Accumulo instance you are connecting to. An Accumulo instance with default configurations will use
 +   *          {@link PasswordToken}
 +   * @since 1.5.0
 +   */
 +  public abstract Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException;
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index 8f62f8f,0000000..87f3a78
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@@ -1,311 -1,0 +1,352 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
++import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.impl.ConnectorImpl;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.ArgumentChecker;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.TextUtil;
++import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * <p>
 + * An implementation of instance that looks in zookeeper to find information needed to connect to an instance of accumulo.
 + * 
 + * <p>
 + * The advantage of using zookeeper to obtain information about accumulo is that zookeeper is highly available, very responsive, and supports caching.
 + * 
 + * <p>
 + * Because it is possible for multiple instances of accumulo to share a single set of zookeeper servers, all constructors require an accumulo instance name.
 + * 
 + * If you do not know the instance names then run accumulo org.apache.accumulo.server.util.ListInstances on an accumulo server.
 + * 
 + */
 +
 +public class ZooKeeperInstance implements Instance {
-   
++
 +  private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
-   
++
 +  private String instanceId = null;
 +  private String instanceName = null;
-   
++
 +  private final ZooCache zooCache;
-   
++
 +  private final String zooKeepers;
-   
++
 +  private final int zooKeepersSessionTimeOut;
-   
++
++  private volatile boolean closed = false;
++
 +  /**
 +   * 
 +   * @param instanceName
 +   *          The name of specific accumulo instance. This is set at initialization time.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   */
-   
++
 +  public ZooKeeperInstance(String instanceName, String zooKeepers) {
 +    this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
 +  }
-   
++
 +  /**
 +   * 
 +   * @param instanceName
 +   *          The name of specific accumulo instance. This is set at initialization time.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   * @param sessionTimeout
 +   *          zoo keeper session time out in milliseconds.
 +   */
-   
++
 +  public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
 +    ArgumentChecker.notNull(instanceName, zooKeepers);
 +    this.instanceName = instanceName;
 +    this.zooKeepers = zooKeepers;
 +    this.zooKeepersSessionTimeOut = sessionTimeout;
 +    zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
 +    getInstanceID();
++    clientInstances.incrementAndGet();
 +  }
-   
++
 +  /**
 +   * 
 +   * @param instanceId
 +   *          The UUID that identifies the accumulo instance you want to connect to.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   */
-   
++
 +  public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
 +    this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
 +  }
-   
++
 +  /**
 +   * 
 +   * @param instanceId
 +   *          The UUID that identifies the accumulo instance you want to connect to.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   * @param sessionTimeout
 +   *          zoo keeper session time out in milliseconds.
 +   */
-   
++
 +  public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
 +    ArgumentChecker.notNull(instanceId, zooKeepers);
 +    this.instanceId = instanceId.toString();
 +    this.zooKeepers = zooKeepers;
 +    this.zooKeepersSessionTimeOut = sessionTimeout;
 +    zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
++    clientInstances.incrementAndGet();
 +  }
-   
++
 +  @Override
 +  public String getInstanceID() {
++    if (closed)
++      throw new RuntimeException("ZooKeeperInstance has been closed.");
 +    if (instanceId == null) {
 +      // want the instance id to be stable for the life of this instance object,
 +      // so only get it once
 +      String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
 +      byte[] iidb = zooCache.get(instanceNamePath);
 +      if (iidb == null) {
 +        throw new RuntimeException("Instance name " + instanceName
 +            + " does not exist in zookeeper.  Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
 +      }
 +      instanceId = new String(iidb);
 +    }
-     
++
 +    if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
 +      if (instanceName == null)
 +        throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
 +      throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
 +    }
-     
++
 +    return instanceId;
 +  }
-   
++
 +  @Override
 +  public List<String> getMasterLocations() {
++    if (closed)
++      throw new RuntimeException("ZooKeeperInstance has been closed.");
 +    String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
-     
++
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
 +    byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
 +    opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-     
++
 +    if (loc == null) {
 +      return Collections.emptyList();
 +    }
-     
++
 +    return Collections.singletonList(new String(loc));
 +  }
-   
++
 +  @Override
 +  public String getRootTabletLocation() {
++    if (closed)
++      throw new RuntimeException("ZooKeeperInstance has been closed.");
 +    String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
-     
++
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
 +    byte[] loc = zooCache.get(zRootLocPath);
 +    opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
-     
++
 +    if (loc == null) {
 +      return null;
 +    }
-     
++
 +    return new String(loc).split("\\|")[0];
 +  }
-   
++
 +  @Override
 +  public String getInstanceName() {
++    if (closed)
++      throw new RuntimeException("ZooKeeperInstance has been closed.");
 +    if (instanceName == null)
 +      instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
-     
++
 +    return instanceName;
 +  }
-   
++
 +  @Override
 +  public String getZooKeepers() {
 +    return zooKeepers;
 +  }
-   
++
 +  @Override
 +  public int getZooKeepersSessionTimeOut() {
 +    return zooKeepersSessionTimeOut;
 +  }
-   
++
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
 +  }
-   
++
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, ByteBufferUtil.toBytes(pass));
 +  }
-   
++
 +  @Override
 +  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
 +  }
 +  
 +  @SuppressWarnings("deprecation")
 +  private Connector getConnector(TCredentials credential) throws AccumuloException, AccumuloSecurityException {
 +    return new ConnectorImpl(this, credential);
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String principal, byte[] pass) throws AccumuloException, AccumuloSecurityException {
-     return getConnector(principal, new PasswordToken(pass));
++    if (closed) {
++      throw new RuntimeException("ZooKeeperInstance has been closed.");
++    } else {
++      return getConnector(principal, new PasswordToken(pass));
++    }
 +  }
-   
++
 +  private AccumuloConfiguration conf = null;
-   
++
 +  @Override
 +  public AccumuloConfiguration getConfiguration() {
 +    if (conf == null)
 +      conf = AccumuloConfiguration.getDefaultConfiguration();
 +    return conf;
 +  }
-   
++
 +  @Override
 +  public void setConfiguration(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
-   
++
 +  /**
 +   * @deprecated Use {@link #lookupInstanceName(org.apache.accumulo.fate.zookeeper.ZooCache, UUID)} instead
 +   */
 +  @Deprecated
 +  public static String lookupInstanceName(org.apache.accumulo.core.zookeeper.ZooCache zooCache, UUID instanceId) {
 +    return lookupInstanceName((ZooCache) zooCache, instanceId);
 +  }
 +  
 +  /**
 +   * Given a zooCache and instanceId, look up the instance name.
 +   * 
 +   * @param zooCache
 +   * @param instanceId
 +   * @return the instance name
 +   */
 +  public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) {
 +    ArgumentChecker.notNull(zooCache, instanceId);
 +    for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
 +      String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name;
 +      UUID iid = UUID.fromString(new String(zooCache.get(instanceNamePath)));
 +      if (iid.equals(instanceId)) {
 +        return name;
 +      }
 +    }
 +    return null;
 +  }
 +  
 +  /**
 +   * To be moved to server code. Only lives here to support certain client side utilities to minimize command-line options.
 +   */
 +  @Deprecated
 +  public static String getInstanceIDFromHdfs(Path instanceDirectory) {
 +    try {
 +      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
 +      FileStatus[] files = null;
 +      try {
 +        files = fs.listStatus(instanceDirectory);
 +      } catch (FileNotFoundException ex) {
 +        // ignored
 +      }
 +      log.debug("Trying to read instance id from " + instanceDirectory);
 +      if (files == null || files.length == 0) {
 +        log.error("unable obtain instance id at " + instanceDirectory);
 +        throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
 +      } else if (files.length != 1) {
 +        log.error("multiple potential instances in " + instanceDirectory);
 +        throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
 +      } else {
 +        String result = files[0].getPath().getName();
 +        return result;
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
 +    }
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(auth.user, auth.password);
 +  }
++
++  static private final AtomicInteger clientInstances = new AtomicInteger(0);
++
++  @Override
++  public synchronized void close() throws AccumuloException {
++    if (!closed && clientInstances.decrementAndGet() == 0) {
++      try {
++        zooCache.close();
++        ThriftUtil.close();
++      } catch (InterruptedException e) {
++        clientInstances.incrementAndGet();
++        throw new AccumuloException("Issues closing ZooKeeper.");
++      }
++      closed = true;
++    }
++  }
++
++  @Override
++  public void finalize() {
++    // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
++    if (!closed)
++      log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index ceeab21,0000000..41b2527
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -1,607 -1,0 +1,617 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.security.SecurityPermission;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TTimeoutTransport;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public class ThriftTransportPool {
 +  private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
 +  
 +  private static final Random random = new Random();
 +  private long killTime = 1000 * 3;
 +  
 +  private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
 +  private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
 +  private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
 +  private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
 +  
 +  private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
 +  
 +  private static final Long ERROR_THRESHOLD = 20l;
 +  private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
 +  
 +  private static class CachedConnection {
 +    
 +    public CachedConnection(CachedTTransport t) {
 +      this.transport = t;
 +    }
 +    
 +    void setReserved(boolean reserved) {
 +      this.transport.setReserved(reserved);
 +    }
 +    
 +    boolean isReserved() {
 +      return this.transport.reserved;
 +    }
 +    
 +    CachedTTransport transport;
 +    
 +    long lastReturnTime;
 +  }
 +  
 +  private static class Closer implements Runnable {
 +    final ThriftTransportPool pool;
++    final AtomicBoolean stop;
 +    
-     public Closer(ThriftTransportPool pool) {
++    public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
 +      this.pool = pool;
++      this.stop = stop;
 +    }
 +    
 +    public void run() {
-       while (true) {
++      while (!stop.get()) {
 +        
 +        ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
 +        
 +        synchronized (pool) {
 +          for (List<CachedConnection> ccl : pool.cache.values()) {
 +            Iterator<CachedConnection> iter = ccl.iterator();
 +            while (iter.hasNext()) {
 +              CachedConnection cachedConnection = iter.next();
 +              
 +              if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
 +                connectionsToClose.add(cachedConnection);
 +                iter.remove();
 +              }
 +            }
 +          }
 +          
 +          for (List<CachedConnection> ccl : pool.cache.values()) {
 +            for (CachedConnection cachedConnection : ccl) {
 +              cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
 +            }
 +          }
 +          
 +          Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
 +          while (iter.hasNext()) {
 +            Entry<ThriftTransportKey,Long> entry = iter.next();
 +            long delta = System.currentTimeMillis() - entry.getValue();
 +            if (delta >= STUCK_THRESHOLD) {
 +              pool.errorCount.remove(entry.getKey());
 +              iter.remove();
 +            }
 +          }
 +        }
 +        
 +        // close connections outside of sync block
 +        for (CachedConnection cachedConnection : connectionsToClose) {
 +          cachedConnection.transport.close();
 +        }
 +        
 +        try {
 +          Thread.sleep(500);
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    }
 +  }
 +  
 +  static class CachedTTransport extends TTransport {
 +    
 +    private ThriftTransportKey cacheKey;
 +    private TTransport wrappedTransport;
 +    private boolean sawError = false;
 +    
 +    private volatile String ioThreadName = null;
 +    private volatile long ioStartTime = 0;
 +    private volatile boolean reserved = false;
 +    
 +    private String stuckThreadName = null;
 +    
 +    int ioCount = 0;
 +    int lastIoCount = -1;
 +    
 +    private void sawError(Exception e) {
 +      sawError = true;
 +    }
 +    
 +    final void setReserved(boolean reserved) {
 +      this.reserved = reserved;
 +      if (reserved) {
 +        ioThreadName = Thread.currentThread().getName();
 +        ioCount = 0;
 +        lastIoCount = -1;
 +      } else {
 +        if ((ioCount & 1) == 1) {
 +          // connection unreserved, but it seems io may still be
 +          // happening
 +          log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
 +              new Exception());
 +        }
 +        
 +        ioCount = 0;
 +        lastIoCount = -1;
 +        ioThreadName = null;
 +      }
 +      checkForStuckIO(STUCK_THRESHOLD);
 +    }
 +    
 +    final void checkForStuckIO(long threshold) {
 +      /*
 +       * checking for stuck io needs to be light weight.
 +       * 
 +       * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
 +       * incrementing a counter before and after each io operation.
 +       */
 +      
 +      if ((ioCount & 1) == 1) {
 +        // when ioCount is odd, it means I/O is currently happening
 +        if (ioCount == lastIoCount) {
 +          // still doing same I/O operation as last time this
 +          // functions was called
 +          long delta = System.currentTimeMillis() - ioStartTime;
 +          if (delta >= threshold && stuckThreadName == null) {
 +            stuckThreadName = ioThreadName;
 +            log.warn("Thread \"" + ioThreadName + "\" stuck on IO  to " + cacheKey + " for at least " + delta + " ms");
 +          }
 +        } else {
 +          // remember this ioCount and the time we saw it, need to see
 +          // if it changes
 +          lastIoCount = ioCount;
 +          ioStartTime = System.currentTimeMillis();
 +          
 +          if (stuckThreadName != null) {
 +            // doing I/O, but ioCount changed so no longer stuck
 +            log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +            stuckThreadName = null;
 +          }
 +        }
 +      } else {
 +        // I/O is not currently happening
 +        if (stuckThreadName != null) {
 +          // no longer stuck, and was stuck in the past
 +          log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +          stuckThreadName = null;
 +        }
 +      }
 +    }
 +    
 +    public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
 +      this.wrappedTransport = transport;
 +      this.cacheKey = cacheKey2;
 +    }
 +    
 +    public boolean isOpen() {
 +      return wrappedTransport.isOpen();
 +    }
 +    
 +    public void open() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.open();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.read(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.readAll(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void close() {
 +      try {
 +        ioCount++;
 +        wrappedTransport.close();
 +      } finally {
 +        ioCount++;
 +      }
 +      
 +    }
 +    
 +    public void flush() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.flush();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public boolean peek() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.peek();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public byte[] getBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBufferPosition() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBufferPosition();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBytesRemainingInBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBytesRemainingInBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void consumeBuffer(int len) {
 +      try {
 +        ioCount++;
 +        wrappedTransport.consumeBuffer(len);
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public ThriftTransportKey getCacheKey() {
 +      return cacheKey;
 +    }
 +    
 +  }
 +  
 +  private ThriftTransportPool() {}
 +  
 +  public TTransport getTransport(String location, int port) throws TTransportException {
 +    return getTransport(location, port, 0);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
 +    
 +    servers = new ArrayList<ThriftTransportKey>(servers);
 +    
 +    if (preferCachedConnection) {
 +      HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
 +      
 +      synchronized (this) {
 +        
 +        // randomly pick a server from the connection cache
 +        serversSet.retainAll(cache.keySet());
 +        
 +        if (serversSet.size() > 0) {
 +          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
 +          Collections.shuffle(cachedServers, random);
 +          
 +          for (ThriftTransportKey ttk : cachedServers) {
 +            for (CachedConnection cachedConnection : cache.get(ttk)) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +    }
 +    
 +    int retryCount = 0;
 +    while (servers.size() > 0 && retryCount < 10) {
 +      int index = random.nextInt(servers.size());
 +      ThriftTransportKey ttk = servers.get(index);
 +      
 +      if (!preferCachedConnection) {
 +        synchronized (this) {
 +          List<CachedConnection> cachedConnList = cache.get(ttk);
 +          if (cachedConnList != null) {
 +            for (CachedConnection cachedConnection : cachedConnList) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +
 +      try {
 +        return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to connect to " + servers.get(index), tte);
 +        servers.remove(index);
 +        retryCount++;
 +      }
 +    }
 +    
 +    throw new TTransportException("Failed to connect to a server");
 +  }
 +  
 +  public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException {
 +    return getTransport(new ThriftTransportKey(location, port, milliseconds));
 +  }
 +  
 +  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    synchronized (this) {
 +      // atomically reserve location if it exist in cache
 +      List<CachedConnection> ccl = cache.get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
 +        cache.put(cacheKey, ccl);
 +      }
 +      
 +      for (CachedConnection cachedConnection : ccl) {
 +        if (!cachedConnection.isReserved()) {
 +          cachedConnection.setReserved(true);
 +          if (log.isTraceEnabled())
 +            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +          return cachedConnection.transport;
 +        }
 +      }
 +    }
 +    
 +    return createNewTransport(cacheKey);
 +  }
 +  
 +  private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    TTransport transport;
 +    if (cacheKey.getTimeout() == 0) {
 +      transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort());
 +    } else {
 +      try {
 +        transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
 +      } catch (IOException ex) {
 +        throw new TTransportException(ex);
 +      }
 +    }
 +    transport = ThriftUtil.transportFactory().getTransport(transport);
 +    transport.open();
 +    
 +    if (log.isTraceEnabled())
 +      log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +    
 +    CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 +    
 +    CachedConnection cc = new CachedConnection(tsc);
 +    cc.setReserved(true);
 +    
 +    synchronized (this) {
 +      List<CachedConnection> ccl = cache.get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
 +        cache.put(cacheKey, ccl);
 +      }
 +      
 +      ccl.add(cc);
 +    }
 +    return cc.transport;
 +  }
 +  
 +  public void returnTransport(TTransport tsc) {
 +    if (tsc == null) {
 +      return;
 +    }
 +    
 +    boolean existInCache = false;
 +    CachedTTransport ctsc = (CachedTTransport) tsc;
 +    
 +    ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 +
 +    synchronized (this) {
 +      List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
 +      for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +        CachedConnection cachedConnection = iterator.next();
 +        if (cachedConnection.transport == tsc) {
 +          if (ctsc.sawError) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection had error " + ctsc.getCacheKey());
 +            
 +            Long ecount = errorCount.get(ctsc.getCacheKey());
 +            if (ecount == null)
 +              ecount = 0l;
 +            ecount++;
 +            errorCount.put(ctsc.getCacheKey(), ecount);
 +            
 +            Long etime = errorTime.get(ctsc.getCacheKey());
 +            if (etime == null) {
 +              errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
 +            }
 +            
 +            if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
 +              log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
 +              serversWarnedAbout.add(ctsc.getCacheKey());
 +            }
 +            
 +            cachedConnection.setReserved(false);
 +            
 +          } else {
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
 +            
 +            cachedConnection.lastReturnTime = System.currentTimeMillis();
 +            cachedConnection.setReserved(false);
 +          }
 +          existInCache = true;
 +          break;
 +        }
 +      }
 +      
 +      // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
 +      if (ctsc.sawError) {
 +        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +          CachedConnection cachedConnection = iterator.next();
 +          if (!cachedConnection.isReserved()) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +          }
 +        }
 +      }
 +    }
 +    
 +    // close outside of sync block
 +    for (CachedConnection cachedConnection : closeList) {
 +      try {
 +        cachedConnection.transport.close();
 +      } catch (Exception e) {
 +        log.debug("Failed to close connection w/ errors", e);
 +      }
 +    }
 +    
 +    if (!existInCache) {
 +      log.warn("Returned tablet server connection to cache that did not come from cache");
 +      // close outside of sync block
 +      tsc.close();
 +    }
 +  }
 +  
 +  /**
 +   * Set the time after which idle connections should be closed
 +   * 
 +   * @param time
 +   */
 +  public synchronized void setIdleTime(long time) {
 +    this.killTime = time;
 +    log.debug("Set thrift transport pool idle time to " + time);
 +  }
 +
 +  private static ThriftTransportPool instance = new ThriftTransportPool();
 +  private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
++  private static AtomicBoolean stopDaemon;
 +  
 +  public static ThriftTransportPool getInstance() {
 +    SecurityManager sm = System.getSecurityManager();
 +    if (sm != null) {
 +      sm.checkPermission(TRANSPORT_POOL_PERMISSION);
 +    }
 +    
 +    if (daemonStarted.compareAndSet(false, true)) {
-       new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
++      stopDaemon = new AtomicBoolean(false);
++      new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
 +    }
 +    return instance;
 +  }
++  
++  public static void close() {
++    if (daemonStarted.compareAndSet(true, false)) {
++      stopDaemon.set(true);
++    }
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index c0829df,0000000..55213ef
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@@ -1,166 -1,0 +1,171 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mock;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +
 +/**
 + * Mock Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly from
 + * the behavior of Accumulo. This could result in unit tests that pass on Mock Accumulo and fail on Accumulo or visa-versa. Documenting the differences would be
 + * difficult and is not done.
 + * 
 + * <p>
 + * An alternative to Mock Accumulo called MiniAccumuloCluster was introduced in Accumulo 1.5. MiniAccumuloCluster spins up actual Accumulo server processes, can
 + * be used for unit testing, and its behavior should match Accumulo. The drawback of MiniAccumuloCluster is that it starts more slowly than Mock Accumulo.
 + * 
 + */
 +
 +public class MockInstance implements Instance {
 +  
 +  static final String genericAddress = "localhost:1234";
 +  static final Map<String,MockAccumulo> instances = new HashMap<String,MockAccumulo>();
 +  MockAccumulo acu;
 +  String instanceName;
 +  
 +  public MockInstance() {
 +    acu = new MockAccumulo(getDefaultFileSystem());
 +    instanceName = "mock-instance";
 +  }
 +  
 +  static FileSystem getDefaultFileSystem() {
 +    try {
 +      Configuration conf = CachedConfiguration.getInstance();
 +      conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
 +      conf.set("fs.default.name", "file:///");
 +      return FileSystem.get(CachedConfiguration.getInstance());
 +    } catch (IOException ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +  
 +  public MockInstance(String instanceName) {
 +    this(instanceName, getDefaultFileSystem());
 +  }
 +  
 +  public MockInstance(String instanceName, FileSystem fs) {
 +    synchronized (instances) {
 +      if (instances.containsKey(instanceName))
 +        acu = instances.get(instanceName);
 +      else
 +        instances.put(instanceName, acu = new MockAccumulo(fs));
 +    }
 +    this.instanceName = instanceName;
 +  }
 +  
 +  @Override
 +  public String getRootTabletLocation() {
 +    return genericAddress;
 +  }
 +  
 +  @Override
 +  public List<String> getMasterLocations() {
 +    return Collections.singletonList(genericAddress);
 +  }
 +  
 +  @Override
 +  public String getInstanceID() {
 +    return "mock-instance-id";
 +  }
 +  
 +  @Override
 +  public String getInstanceName() {
 +    return instanceName;
 +  }
 +  
 +  @Override
 +  public String getZooKeepers() {
 +    return "localhost";
 +  }
 +  
 +  @Override
 +  public int getZooKeepersSessionTimeOut() {
 +    return 30 * 1000;
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, new PasswordToken(pass));
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, ByteBufferUtil.toBytes(pass));
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
 +  }
 +  
 +  AccumuloConfiguration conf = null;
 +  
 +  @Override
 +  public AccumuloConfiguration getConfiguration() {
 +    if (conf == null)
 +      conf = AccumuloConfiguration.getDefaultConfiguration();
 +    return conf;
 +  }
 +  
 +  @Override
 +  public void setConfiguration(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(auth.user, auth.password);
 +  }
 +  
 +  @Override
 +  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +    Connector conn = new MockConnector(principal, acu, this);
 +    if (!acu.users.containsKey(principal))
 +      conn.securityOperations().createLocalUser(principal, (PasswordToken) token);
 +    else if (!acu.users.get(principal).token.equals(token))
 +      throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS);
 +    return conn;
 +  }
++
++  @Override
++  public void close() throws AccumuloException {
++    // NOOP
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04f81b50/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 9bffc81,0000000..881cdfc
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@@ -1,223 -1,0 +1,227 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.util;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.impl.ClientExec;
 +import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.TServiceClientFactory;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.protocol.TMessage;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.transport.TFramedTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +
 +
 +public class ThriftUtil {
 +  private static final Logger log = Logger.getLogger(ThriftUtil.class);
 +
 +  public static class TraceProtocol extends TCompactProtocol {
 +
 +    @Override
 +    public void writeMessageBegin(TMessage message) throws TException {
 +      Trace.start("client:" + message.name);
 +      super.writeMessageBegin(message);
 +    }
 +
 +    @Override
 +    public void writeMessageEnd() throws TException {
 +      super.writeMessageEnd();
 +      Span currentTrace = Trace.currentTrace();
 +      if (currentTrace != null)
 +        currentTrace.stop();
 +    }
 +
 +    public TraceProtocol(TTransport transport) {
 +      super(transport);
 +    }
 +  }
 +  
 +  public static class TraceProtocolFactory extends TCompactProtocol.Factory {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public TProtocol getProtocol(TTransport trans) {
 +      return new TraceProtocol(trans);
 +    }
 +  }
 +  
 +  static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
 +  static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
 +  
 +  static public <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
 +    return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, InetSocketAddress address, AccumuloConfiguration conf)
 +      throws TTransportException {
 +    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, conf));
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, AccumuloConfiguration configuration)
 +      throws TTransportException {
 +    int port = configuration.getPort(property);
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port);
 +    return createClient(factory, transport);
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, Property timeoutProperty,
 +      AccumuloConfiguration configuration) throws TTransportException {
 +    return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty), configuration);
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, long timeout,
 +      AccumuloConfiguration configuration) throws TTransportException {
 +    int port = configuration.getPort(property);
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, timeout);
 +    return createClient(factory, transport);
 +  }
 +  
 +  static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
 +    if (iface != null) {
 +      ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
 +    }
 +  }
 +  
 +  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT, conf);
 +  }
 +  
 +  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf, long timeout) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, timeout, conf);
 +  }
 +
 +  public static void execute(String address, AccumuloConfiguration conf, ClientExec<TabletClientService.Client> exec) throws AccumuloException,
 +      AccumuloSecurityException {
 +    while (true) {
 +      TabletClientService.Client client = null;
 +      try {
 +        exec.execute(client = getTServerClient(address, conf));
 +        break;
 +      } catch (TTransportException tte) {
 +        log.debug("getTServerClient request failed, retrying ... ", tte);
 +        UtilWaitThread.sleep(100);
 +      } catch (ThriftSecurityException e) {
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (Exception e) {
 +        throw new AccumuloException(e);
 +      } finally {
 +        if (client != null)
 +          returnClient(client);
 +      }
 +    }
 +  }
 +  
 +  public static <T> T execute(String address, AccumuloConfiguration conf, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException,
 +      AccumuloSecurityException {
 +    while (true) {
 +      TabletClientService.Client client = null;
 +      try {
 +        return exec.execute(client = getTServerClient(address, conf));
 +      } catch (TTransportException tte) {
 +        log.debug("getTServerClient request failed, retrying ... ", tte);
 +        UtilWaitThread.sleep(100);
 +      } catch (ThriftSecurityException e) {
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (Exception e) {
 +        throw new AccumuloException(e);
 +      } finally {
 +        if (client != null)
 +          returnClient(client);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * create a transport that is not pooled
 +   */
 +  public static TTransport createTransport(String address, int port, AccumuloConfiguration conf) throws TException {
 +    TTransport transport = null;
 +    
 +    try {
 +      transport = TTimeoutTransport.create(org.apache.accumulo.core.util.AddressUtil.parseAddress(address, port),
 +          conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +      transport = ThriftUtil.transportFactory().getTransport(transport);
 +      transport.open();
 +      TTransport tmp = transport;
 +      transport = null;
 +      return tmp;
 +    } catch (IOException ex) {
 +      throw new TTransportException(ex);
 +    } finally {
 +      if (transport != null)
 +        transport.close();
 +    }
 +    
 +
 +  }
 +
 +  /**
 +   * create a transport that is not pooled
 +   */
 +  public static TTransport createTransport(InetSocketAddress address, AccumuloConfiguration conf) throws TException {
 +    return createTransport(address.getAddress().getHostAddress(), address.getPort(), conf);
 +  }
 +
 +  public static TTransportFactory transportFactory() {
 +    return transportFactory;
 +  }
 +  
 +  private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 +  synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
 +    TTransportFactory factory = factoryCache.get(maxFrameSize);
 +    if(factory == null)
 +    {
 +      factory = new TFramedTransport.Factory(maxFrameSize);
 +      factoryCache.put(maxFrameSize,factory);
 +    }
 +    return factory;
 +  }
 +
 +  synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
 +    if(maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
 +      throw new RuntimeException("Thrift transport frames are limited to "+Integer.MAX_VALUE);
 +    return transportFactory((int)maxFrameSize);
 +  }
 +
 +  public static TProtocolFactory protocolFactory() {
 +    return protocolFactory;
 +  }
++  
++  public static void close() {
++    ThriftTransportPool.close();
++  }
 +}


[6/9] git commit: ACCUMULO-1858 Fixed compilation error from bad merge.

Posted by uj...@apache.org.
ACCUMULO-1858 Fixed compilation error from bad merge.

The old merge&push bit me as I was missing a '}'.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a7c5b72d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a7c5b72d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a7c5b72d

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: a7c5b72d3b5b28775106adf87dab2f76f5c1430e
Parents: 036f381
Author: Bill Slacum <uj...@apache.org>
Authored: Mon Nov 18 14:44:55 2013 -0500
Committer: Bill Slacum <uj...@apache.org>
Committed: Mon Nov 18 14:44:55 2013 -0500

----------------------------------------------------------------------
 .../apache/accumulo/examples/simple/client/RandomBatchWriter.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a7c5b72d/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
index a640003..845d67d 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
@@ -123,6 +123,7 @@ public class RandomBatchWriter {
     opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
     if ((opts.max - opts.min) < opts.num) {
       System.err.println(String.format("You must specify a min and a max that allow for at least num possible values. For example, you requested %d rows, but a min of %d and a max of %d only allows for %d rows.", opts.num, opts.min, opts.max, (opts.max - opts.min)));
+    }
     Random r;
     if (opts.seed == null)
       r = new Random();


[7/9] git commit: ACCUMULO-1892 redo resolving conflict resolution from 1.4 merge

Posted by uj...@apache.org.
ACCUMULO-1892 redo resolving conflict resolution from 1.4 merge


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/60dd8bd7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/60dd8bd7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/60dd8bd7

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 60dd8bd732ce0596150e49ae06fa7605aa0a15fd
Parents: a7c5b72
Author: Keith Turner <kt...@apache.org>
Authored: Mon Nov 18 15:18:04 2013 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Nov 18 15:20:40 2013 -0500

----------------------------------------------------------------------
 .../examples/simple/client/RandomBatchWriter.java       | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/60dd8bd7/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
index 845d67d..ce91da6 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
@@ -123,6 +123,7 @@ public class RandomBatchWriter {
     opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
     if ((opts.max - opts.min) < opts.num) {
       System.err.println(String.format("You must specify a min and a max that allow for at least num possible values. For example, you requested %d rows, but a min of %d and a max of %d only allows for %d rows.", opts.num, opts.min, opts.max, (opts.max - opts.min)));
+      System.exit(1);
     }
     Random r;
     if (opts.seed == null)
@@ -130,15 +131,18 @@ public class RandomBatchWriter {
     else {
       r = new Random(opts.seed);
     }
-    
     Connector connector = opts.getConnector();
     BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig());
     
     // reuse the ColumnVisibility object to improve performance
     ColumnVisibility cv = opts.visiblity;
-    
-    for (int i = 0; i < opts.num; i++) {  
-      long rowid = (Math.abs(r.nextLong()) % (opts.max - opts.min)) + opts.min;
+   
+    // Generate num unique row ids in the given range
+    HashSet<Long> rowids = new HashSet<Long>(opts.num);
+    while (rowids.size() < opts.num) {
+      rowids.add((Math.abs(r.nextLong()) % (opts.max - opts.min)) + opts.min);
+    }
+    for (long rowid : rowids) {
       Mutation m = createMutation(rowid, opts.size, cv);
       bw.addMutation(m);
     }