You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/12/17 00:51:47 UTC

[1/3] accumulo git commit: ACCUMULO-3425 Fix style/formatter/whitespace

Repository: accumulo
Updated Branches:
  refs/heads/master 0354a7c88 -> f3878f5f6


ACCUMULO-3425 Fix style/formatter/whitespace


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfb66c35
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfb66c35
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfb66c35

Branch: refs/heads/master
Commit: dfb66c35358da2bb49704038f4f22ec2eaaaf96e
Parents: 0354a7c
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 16 15:46:44 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 16 16:42:31 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 187 +++++++++----------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |   6 +-
 .../accumulo/server/rpc/TServerUtils.java       |   7 +-
 3 files changed, 99 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index bdb04d9..c159e8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -42,41 +42,41 @@ import com.google.common.net.HostAndPort;
 
 public class ThriftTransportPool {
   private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
-  
+
   private static final Random random = new Random();
   private long killTime = 1000 * 3;
-  
+
   private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
 
   private CountDownLatch closerExitLatch;
-  
+
   private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
-  
+
   private static final Long ERROR_THRESHOLD = 20l;
   private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
-  
+
   private static class CachedConnection {
-    
+
     public CachedConnection(CachedTTransport t) {
       this.transport = t;
     }
-    
+
     void setReserved(boolean reserved) {
       this.transport.setReserved(reserved);
     }
-    
+
     boolean isReserved() {
       return this.transport.reserved;
     }
-    
+
     CachedTTransport transport;
-    
+
     long lastReturnTime;
   }
-  
+
   public static class TransportPoolShutdownException extends RuntimeException {
     private static final long serialVersionUID = 1L;
   }
@@ -84,36 +84,36 @@ public class ThriftTransportPool {
   private static class Closer implements Runnable {
     final ThriftTransportPool pool;
     private CountDownLatch closerExitLatch;
-    
+
     public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
       this.pool = pool;
       this.closerExitLatch = closerExitLatch;
     }
-    
+
     private void closeConnections() {
       while (true) {
-        
+
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
-        
+
         synchronized (pool) {
           for (List<CachedConnection> ccl : pool.getCache().values()) {
             Iterator<CachedConnection> iter = ccl.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
-              
+
               if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
                 connectionsToClose.add(cachedConnection);
                 iter.remove();
               }
             }
           }
-          
+
           for (List<CachedConnection> ccl : pool.getCache().values()) {
             for (CachedConnection cachedConnection : ccl) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
           }
-          
+
           Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
           while (iter.hasNext()) {
             Entry<ThriftTransportKey,Long> entry = iter.next();
@@ -124,12 +124,12 @@ public class ThriftTransportPool {
             }
           }
         }
-        
+
         // close connections outside of sync block
         for (CachedConnection cachedConnection : connectionsToClose) {
           cachedConnection.transport.close();
         }
-        
+
         try {
           Thread.sleep(500);
         } catch (InterruptedException e) {
@@ -142,32 +142,31 @@ public class ThriftTransportPool {
     public void run() {
       try {
         closeConnections();
-      } catch (TransportPoolShutdownException e) {
-      } finally {
+      } catch (TransportPoolShutdownException e) {} finally {
         closerExitLatch.countDown();
       }
     }
   }
-  
+
   static class CachedTTransport extends TTransport {
-    
+
     private ThriftTransportKey cacheKey;
     private TTransport wrappedTransport;
     private boolean sawError = false;
-    
+
     private volatile String ioThreadName = null;
     private volatile long ioStartTime = 0;
     private volatile boolean reserved = false;
-    
+
     private String stuckThreadName = null;
-    
+
     int ioCount = 0;
     int lastIoCount = -1;
-    
+
     private void sawError(Exception e) {
       sawError = true;
     }
-    
+
     final void setReserved(boolean reserved) {
       this.reserved = reserved;
       if (reserved) {
@@ -181,22 +180,22 @@ public class ThriftTransportPool {
           log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
               new Exception());
         }
-        
+
         ioCount = 0;
         lastIoCount = -1;
         ioThreadName = null;
       }
       checkForStuckIO(STUCK_THRESHOLD);
     }
-    
+
     final void checkForStuckIO(long threshold) {
       /*
        * checking for stuck io needs to be light weight.
-       * 
+       *
        * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
        * incrementing a counter before and after each io operation.
        */
-      
+
       if ((ioCount & 1) == 1) {
         // when ioCount is odd, it means I/O is currently happening
         if (ioCount == lastIoCount) {
@@ -212,7 +211,7 @@ public class ThriftTransportPool {
           // if it changes
           lastIoCount = ioCount;
           ioStartTime = System.currentTimeMillis();
-          
+
           if (stuckThreadName != null) {
             // doing I/O, but ioCount changed so no longer stuck
             log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
@@ -228,17 +227,17 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
       this.wrappedTransport = transport;
       this.cacheKey = cacheKey2;
     }
-    
+
     @Override
     public boolean isOpen() {
       return wrappedTransport.isOpen();
     }
-    
+
     @Override
     public void open() throws TTransportException {
       try {
@@ -251,7 +250,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -264,7 +263,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -277,7 +276,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -290,7 +289,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void write(byte[] arg0) throws TTransportException {
       try {
@@ -303,7 +302,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void close() {
       try {
@@ -312,9 +311,9 @@ public class ThriftTransportPool {
       } finally {
         ioCount++;
       }
-      
+
     }
-    
+
     @Override
     public void flush() throws TTransportException {
       try {
@@ -327,7 +326,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public boolean peek() {
       try {
@@ -337,7 +336,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public byte[] getBuffer() {
       try {
@@ -347,7 +346,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int getBufferPosition() {
       try {
@@ -357,7 +356,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int getBytesRemainingInBuffer() {
       try {
@@ -367,7 +366,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void consumeBuffer(int len) {
       try {
@@ -377,33 +376,33 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     public ThriftTransportKey getCacheKey() {
       return cacheKey;
     }
-    
+
   }
-  
+
   private ThriftTransportPool() {}
-  
+
   public TTransport getTransportWithDefaultTimeout(HostAndPort addr, ClientContext context) throws TTransportException {
     return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), context.getClientTimeoutInMillis(), context);
   }
-  
+
   public TTransport getTransport(String location, long milliseconds, ClientContext context) throws TTransportException {
     return getTransport(new ThriftTransportKey(location, milliseconds, context));
   }
-  
+
   private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
     synchronized (this) {
       // atomically reserve location if it exist in cache
       List<CachedConnection> ccl = getCache().get(cacheKey);
-      
+
       if (ccl == null) {
         ccl = new LinkedList<CachedConnection>();
         getCache().put(cacheKey, ccl);
       }
-      
+
       for (CachedConnection cachedConnection : ccl) {
         if (!cachedConnection.isReserved()) {
           cachedConnection.setReserved(true);
@@ -413,26 +412,26 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     return createNewTransport(cacheKey);
   }
-  
+
   Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
-    
+
     servers = new ArrayList<ThriftTransportKey>(servers);
-    
+
     if (preferCachedConnection) {
       HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
-      
+
       synchronized (this) {
-        
+
         // randomly pick a server from the connection cache
         serversSet.retainAll(getCache().keySet());
-        
+
         if (serversSet.size() > 0) {
           ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
           Collections.shuffle(cachedServers, random);
-          
+
           for (ThriftTransportKey ttk : cachedServers) {
             for (CachedConnection cachedConnection : getCache().get(ttk)) {
               if (!cachedConnection.isReserved()) {
@@ -446,12 +445,12 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     int retryCount = 0;
     while (servers.size() > 0 && retryCount < 10) {
       int index = random.nextInt(servers.size());
       ThriftTransportKey ttk = servers.get(index);
-      
+
       if (!preferCachedConnection) {
         synchronized (this) {
           List<CachedConnection> cachedConnList = getCache().get(ttk);
@@ -467,7 +466,7 @@ public class ThriftTransportPool {
           }
         }
       }
-      
+
       try {
         return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
       } catch (TTransportException tte) {
@@ -476,22 +475,22 @@ public class ThriftTransportPool {
         retryCount++;
       }
     }
-    
+
     throw new TTransportException("Failed to connect to a server");
   }
-  
+
   private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
     TTransport transport = ThriftUtil.createClientTransport(HostAndPort.fromParts(cacheKey.getLocation(), cacheKey.getPort()), (int) cacheKey.getTimeout(),
         cacheKey.getSslParams());
 
     if (log.isTraceEnabled())
       log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
-    
+
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
-    
+
     CachedConnection cc = new CachedConnection(tsc);
     cc.setReserved(true);
-    
+
     try {
       synchronized (this) {
         List<CachedConnection> ccl = getCache().get(cacheKey);
@@ -500,7 +499,7 @@ public class ThriftTransportPool {
           ccl = new LinkedList<CachedConnection>();
           getCache().put(cacheKey, ccl);
         }
-      
+
         ccl.add(cc);
       }
     } catch (TransportPoolShutdownException e) {
@@ -509,17 +508,17 @@ public class ThriftTransportPool {
     }
     return cc.transport;
   }
-  
+
   public void returnTransport(TTransport tsc) {
     if (tsc == null) {
       return;
     }
-    
+
     boolean existInCache = false;
     CachedTTransport ctsc = (CachedTTransport) tsc;
-    
+
     ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
-    
+
     synchronized (this) {
       List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
@@ -528,33 +527,33 @@ public class ThriftTransportPool {
           if (ctsc.sawError) {
             closeList.add(cachedConnection);
             iterator.remove();
-            
+
             if (log.isTraceEnabled())
               log.trace("Returned connection had error " + ctsc.getCacheKey());
-            
+
             Long ecount = errorCount.get(ctsc.getCacheKey());
             if (ecount == null)
               ecount = 0l;
             ecount++;
             errorCount.put(ctsc.getCacheKey(), ecount);
-            
+
             Long etime = errorTime.get(ctsc.getCacheKey());
             if (etime == null) {
               errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
             }
-            
+
             if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
               log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
               serversWarnedAbout.add(ctsc.getCacheKey());
             }
-            
+
             cachedConnection.setReserved(false);
-            
+
           } else {
-            
+
             if (log.isTraceEnabled())
               log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
-            
+
             cachedConnection.lastReturnTime = System.currentTimeMillis();
             cachedConnection.setReserved(false);
           }
@@ -562,7 +561,7 @@ public class ThriftTransportPool {
           break;
         }
       }
-      
+
       // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
       if (ctsc.sawError) {
         for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
@@ -574,7 +573,7 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     // close outside of sync block
     for (CachedConnection cachedConnection : closeList) {
       try {
@@ -583,14 +582,14 @@ public class ThriftTransportPool {
         log.debug("Failed to close connection w/ errors", e);
       }
     }
-    
+
     if (!existInCache) {
       log.warn("Returned tablet server connection to cache that did not come from cache");
       // close outside of sync block
       tsc.close();
     }
   }
-  
+
   /**
    * Set the time after which idle connections should be closed
    */
@@ -598,16 +597,16 @@ public class ThriftTransportPool {
     this.killTime = time;
     log.debug("Set thrift transport pool idle time to " + time);
   }
-  
+
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
-  
+
   public static ThriftTransportPool getInstance() {
     SecurityManager sm = System.getSecurityManager();
     if (sm != null) {
       sm.checkPermission(TRANSPORT_POOL_PERMISSION);
     }
-    
+
     if (daemonStarted.compareAndSet(false, true)) {
       CountDownLatch closerExitLatch = new CountDownLatch(1);
       new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
@@ -615,7 +614,7 @@ public class ThriftTransportPool {
     }
     return instance;
   }
-  
+
   private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
     this.closerExitLatch = closerExitLatch;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index a3cb252..7f4609b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -105,8 +105,7 @@ public class ThriftUtil {
     return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
-      throws TTransportException {
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
     return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context));
   }
 
@@ -115,8 +114,7 @@ public class ThriftUtil {
     return getClient(factory, address, context, 0);
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context)
-      throws TTransportException {
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context) throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
     return createClient(factory, transport);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 1bd337c..9a74357 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -193,8 +193,9 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), address);
   }
 
-  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
-      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName,
+      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout)
+      throws TTransportException {
     return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
         timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
   }
@@ -205,7 +206,7 @@ public class TServerUtils {
    * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
    */
   public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
-    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
 
     ServerAddress serverAddress;
     if (sslParams != null) {


[3/3] accumulo git commit: ACCUMULO-3425 Add documentation, remove inner classes, reorder methods.

Posted by el...@apache.org.
ACCUMULO-3425 Add documentation, remove inner classes, reorder methods.

Overall, clean up ThriftUtil and TServerUtils to not be such
monstrous classes full of spattered factory methods, undocumented
methods and odd interplay. Ensures that ThriftUtil only deals with
client-facing or client and server-facing code, while TServerUtils deals
with server-facing code only.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f3878f5f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f3878f5f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f3878f5f

Branch: refs/heads/master
Commit: f3878f5f677b3c80d69b7b7c8f3c351f8e5e77d1
Parents: 0433e03
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 16 18:37:57 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 16 18:41:34 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   |   4 +-
 .../rpc/ProtocolOverridingSSLSocketFactory.java | 103 ++++++
 .../apache/accumulo/core/rpc/ThriftUtil.java    | 322 +++++++++----------
 .../apache/accumulo/core/rpc/TraceProtocol.java |  47 +++
 .../accumulo/core/rpc/TraceProtocolFactory.java |  33 ++
 .../accumulo/server/rpc/TServerUtils.java       | 102 +++++-
 6 files changed, 432 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 7a6e6ab..1220850 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -385,8 +385,8 @@ public class ThriftTransportPool {
 
   private ThriftTransportPool() {}
 
-  public TTransport getTransportWithDefaultTimeout(HostAndPort addr, ClientContext context) throws TTransportException {
-    return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), context.getClientTimeoutInMillis(), context);
+  public TTransport getTransportWithDefaultTimeout(String addr, ClientContext context) throws TTransportException {
+    return getTransport(addr, context.getClientTimeoutInMillis(), context);
   }
 
   public TTransport getTransport(String location, long milliseconds, ClientContext context) throws TTransportException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
new file mode 100644
index 0000000..cc8ca95
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during
+ * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets.
+ *
+ * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
+ * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
+ *
+ * This class can be removed when JDK6 support is officially unsupported by Accumulo
+ */
+class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
+
+  private final SSLSocketFactory delegate;
+  private final String[] enabledProtocols;
+
+  public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) {
+    Preconditions.checkNotNull(enabledProtocols);
+    Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
+    this.delegate = delegate;
+    this.enabledProtocols = enabledProtocols;
+  }
+
+  @Override
+  public String[] getDefaultCipherSuites() {
+    return delegate.getDefaultCipherSuites();
+  }
+
+  @Override
+  public String[] getSupportedCipherSuites() {
+    return delegate.getSupportedCipherSuites();
+  }
+
+  @Override
+  public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException {
+    final Socket underlyingSocket = delegate.createSocket(host, port);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException {
+    final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final InetAddress host, final int port) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(host, port);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  /**
+   * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the <code>socket</code> is a
+   * {@link SSLSocket}
+   *
+   * @param socket
+   *          The Socket
+   */
+  private Socket overrideProtocol(final Socket socket) {
+    if (socket instanceof SSLSocket) {
+      ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
+    }
+    return socket;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 7f4609b..c95a62b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -18,20 +18,12 @@ package org.apache.accumulo.core.rpc;
 
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
 import java.security.KeyStore;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManagerFactory;
@@ -44,101 +36,168 @@ import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.trace.Span;
-import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.TServiceClientFactory;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMessage;
-import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSSLTransportFactory;
-import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
+/**
+ * Factory methods for creating Thrift client objects
+ */
 public class ThriftUtil {
   private static final Logger log = Logger.getLogger(ThriftUtil.class);
 
-  public static class TraceProtocol extends TCompactProtocol {
-    private Span span = null;
-
-    @Override
-    public void writeMessageBegin(TMessage message) throws TException {
-      span = Trace.start("client:" + message.name);
-      super.writeMessageBegin(message);
-    }
+  private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
+  private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
+  private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 
-    @Override
-    public void writeMessageEnd() throws TException {
-      super.writeMessageEnd();
-      span.stop();
-    }
-
-    public TraceProtocol(TTransport transport) {
-      super(transport);
-    }
+  /**
+   * An instance of {@link TraceProtocolFactory}
+   *
+   * @return The default Thrift TProtocolFactory for RPC
+   */
+  public static TProtocolFactory protocolFactory() {
+    return protocolFactory;
   }
 
-  public static class TraceProtocolFactory extends TCompactProtocol.Factory {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public TProtocol getProtocol(TTransport trans) {
-      return new TraceProtocol(trans);
-    }
+  /**
+   * An instance of {@link TFramedTransport.Factory}
+   *
+   * @return The default Thrift TTransportFactory for RPC
+   */
+  public static TTransportFactory transportFactory() {
+    return transportFactory;
   }
 
-  static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
-  static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
-
-  static public <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
+  /**
+   * Create a Thrift client using the given factory and transport
+   */
+  public static <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
     return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
-    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context));
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
+    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address.toString(), context));
   }
 
-  static public <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, String address, ClientContext context)
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available), the address, and client context with no timeout.
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, String address, ClientContext context)
       throws TTransportException {
     return getClient(factory, address, context, 0);
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context) throws TTransportException {
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context. Client timeout is extracted from the
+   * ClientContext
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context) throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
     return createClient(factory, transport);
   }
 
-  static private <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context, long timeout)
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available) using the address, client context and timeou
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   * @param timeout
+   *          Socket timeout which overrides the ClientContext timeout
+   */
+  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context, long timeout)
       throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
     return createClient(factory, transport);
   }
 
-  static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
+  /**
+   * Return the transport used by the client to the shared pool.
+   *
+   * @param iface
+   *          The Client being returned or null.
+   */
+  public static void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
     if (iface != null) {
       ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
     }
   }
 
-  static public TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException {
+  /**
+   * Create a TabletServer Thrift client
+   *
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException {
     return getClient(new TabletClientService.Client.Factory(), address, context);
   }
 
-  static public TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException {
+  /**
+   * Create a TabletServer Thrift client
+   *
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          Options for connecting to the server
+   * @param timeout
+   *          Socket timeout which overrides the ClientContext timeout
+   */
+  public static TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException {
     return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
   }
 
+  /**
+   * Execute the provided closure against a TabletServer at the given address. If a Thrift transport exception occurs, the operation will be automatically
+   * retried.
+   *
+   * @param address
+   *          TabletServer address
+   * @param context
+   *          RPC options
+   * @param exec
+   *          The closure to execute
+   */
   public static void execute(String address, ClientContext context, ClientExec<TabletClientService.Client> exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {
@@ -160,6 +219,18 @@ public class ThriftUtil {
     }
   }
 
+  /**
+   * Execute the provided closure against the TabletServer at the given address, and return the result of the closure to the client. If a Thrift transport
+   * exception occurs, the operation will be automatically retried.
+   *
+   * @param address
+   *          TabletServer address
+   * @param context
+   *          RPC options
+   * @param exec
+   *          Closure with a return value to execute
+   * @return The result from the closure
+   */
   public static <T> T execute(String address, ClientContext context, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {
@@ -181,19 +252,25 @@ public class ThriftUtil {
   }
 
   /**
-   * create a transport that is not pooled
+   * Create a transport that is not pooled
+   *
+   * @param address
+   *          Server address to open the transport to
+   * @param context
+   *          RPC options
    */
   public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
     return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
   }
 
-  public static TTransportFactory transportFactory() {
-    return transportFactory;
-  }
-
-  private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
-
-  synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
+  /**
+   * Get an instance of the TTransportFactory with the provided maximum frame size
+   *
+   * @param maxFrameSize
+   *          Maximum Thrift message frame size
+   * @return A, possibly cached, TTransportFactory with the requested maximum frame size
+   */
+  public static synchronized TTransportFactory transportFactory(int maxFrameSize) {
     TTransportFactory factory = factoryCache.get(maxFrameSize);
     if (factory == null) {
       factory = new TFramedTransport.Factory(maxFrameSize);
@@ -202,47 +279,26 @@ public class ThriftUtil {
     return factory;
   }
 
-  synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
+  /**
+   * @see #transportFactory(int)
+   */
+  public static synchronized TTransportFactory transportFactory(long maxFrameSize) {
     if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
       throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
     return transportFactory((int) maxFrameSize);
   }
 
-  public static TProtocolFactory protocolFactory() {
-    return protocolFactory;
-  }
-
-  public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
-    TServerSocket tServerSock;
-    if (params.useJsse()) {
-      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
-    } else {
-      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
-    }
-
-    ServerSocket serverSock = tServerSock.getServerSocket();
-    if (serverSock instanceof SSLServerSocket) {
-      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
-      String[] protocols = params.getServerProtocols();
-
-      // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
-      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
-      Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
-      // Keep only the enabled protocols that were specified by the configuration
-      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
-      if (socketEnabledProtocols.isEmpty()) {
-        // Bad configuration...
-        throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
-            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
-      }
-
-      // Set the protocol(s) on the server socket
-      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
-    }
-
-    return tServerSock;
-  }
-
+  /**
+   * Create a TTransport for clients to the given address with the provided socket timeout and session-layer configuration
+   *
+   * @param address
+   *          Server address to connect to
+   * @param timeout
+   *          Client socket timeout
+   * @param sslParams
+   *          RPC options for SSL servers
+   * @return An open TTransport which must be closed when finished
+   */
   public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
     boolean success = false;
     TTransport transport = null;
@@ -353,80 +409,4 @@ public class ThriftUtil {
       throw new TTransportException("Could not connect to " + host + " on port " + port, e);
     }
   }
-
-  /**
-   * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during
-   * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets.
-   *
-   * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
-   * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
-   *
-   * This class can be removed when JDK6 support is officially unsupported by Accumulo
-   */
-  private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
-
-    private final SSLSocketFactory delegate;
-    private final String[] enabledProtocols;
-
-    public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) {
-      Preconditions.checkNotNull(enabledProtocols);
-      Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
-      this.delegate = delegate;
-      this.enabledProtocols = enabledProtocols;
-    }
-
-    @Override
-    public String[] getDefaultCipherSuites() {
-      return delegate.getDefaultCipherSuites();
-    }
-
-    @Override
-    public String[] getSupportedCipherSuites() {
-      return delegate.getSupportedCipherSuites();
-    }
-
-    @Override
-    public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException {
-      final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException {
-      final Socket underlyingSocket = delegate.createSocket(host, port);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException {
-      final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final InetAddress host, final int port) throws IOException {
-      final Socket underlyingSocket = delegate.createSocket(host, port);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException {
-      final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    /**
-     * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the <code>socket</code> is a
-     * {@link SSLSocket}
-     *
-     * @param socket
-     *          The Socket
-     */
-    private Socket overrideProtocol(final Socket socket) {
-      if (socket instanceof SSLSocket) {
-        ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
-      }
-      return socket;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
new file mode 100644
index 0000000..74aad57
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * TCompactProtocol implementation which automatically tracks tracing information
+ */
+public class TraceProtocol extends TCompactProtocol {
+  private Span span = null;
+
+  @Override
+  public void writeMessageBegin(TMessage message) throws TException {
+    span = Trace.start("client:" + message.name);
+    super.writeMessageBegin(message);
+  }
+
+  @Override
+  public void writeMessageEnd() throws TException {
+    super.writeMessageEnd();
+    span.stop();
+  }
+
+  public TraceProtocol(TTransport transport) {
+    super(transport);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
new file mode 100644
index 0000000..4591aa6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * {@link TCompactProtocol.Factory} implementation which injects {@link TraceProtocol} instead of {@link TCompactProtocol}
+ */
+public class TraceProtocolFactory extends TCompactProtocol.Factory {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public TProtocol getProtocol(TTransport trans) {
+    return new TraceProtocol(trans);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index d972b9a..210bcf5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -20,11 +20,17 @@ import java.lang.reflect.Field;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import javax.net.ssl.SSLServerSocket;
+
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
@@ -40,6 +46,8 @@ import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -47,9 +55,15 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
+/**
+ * Factory methods for creating Thrift server objects
+ */
 public class TServerUtils {
   private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
 
+  /**
+   * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client address of any incoming RPC.
+   */
   public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
 
   /**
@@ -83,6 +97,7 @@ public class TServerUtils {
     boolean portSearch = false;
     if (portSearchProperty != null)
       portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+
     // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
     TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
     Random random = new Random();
@@ -135,9 +150,8 @@ public class TServerUtils {
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
     options.maxReadBufferBytes = maxMessageSize;
     options.stopTimeoutVal(5);
-    /*
-     * Create our own very special thread pool.
-     */
+
+    // Create our own very special thread pool.
     final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
     // periodically adjust the number of threads we need by checking how busy our threads are
     SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
@@ -171,7 +185,16 @@ public class TServerUtils {
     return new ServerAddress(new CustomNonBlockingServer(options), address);
   }
 
-  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+  /**
+   * Create a TThreadPoolServer with the given transport and processor
+   *
+   * @param transport
+   *          TServerTransport for the server
+   * @param processor
+   *          TProcessor for the server
+   * @return A configured TThreadPoolServer
+   */
+  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory());
@@ -179,11 +202,70 @@ public class TServerUtils {
     return new TThreadPoolServer(options);
   }
 
+  /**
+   * Create the Thrift server socket for RPC running over SSL.
+   *
+   * @param port
+   *          Port of the server socket to bind to
+   * @param timeout
+   *          Socket timeout
+   * @param address
+   *          Address to bind the socket to
+   * @param params
+   *          SSL parameters
+   * @return A configured TServerSocket configured to use SSL
+   * @throws TTransportException
+   */
+  public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+    TServerSocket tServerSock;
+    if (params.useJsse()) {
+      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+    } else {
+      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+    }
+
+    ServerSocket serverSock = tServerSock.getServerSocket();
+    if (serverSock instanceof SSLServerSocket) {
+      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+      String[] protocols = params.getServerProtocols();
+
+      // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
+      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+      Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+      // Keep only the enabled protocols that were specified by the configuration
+      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+      if (socketEnabledProtocols.isEmpty()) {
+        // Bad configuration...
+        throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
+            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
+      }
+
+      // Set the protocol(s) on the server socket
+      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+    }
+
+    return tServerSock;
+  }
+
+  /**
+   * Create a Thrift SSL server.
+   *
+   * @param address
+   *          host and port to bind to
+   * @param processor
+   *          TProcessor for the server
+   * @param socketTimeout
+   *          Socket timeout
+   * @param sslParams
+   *          SSL parameters
+   * @return A ServerAddress with the bound-socket information and the Thrift server
+   * @throws TTransportException
+   */
   public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
       throws TTransportException {
     org.apache.thrift.transport.TServerSocket transport;
     try {
-      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+      transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
     } catch (UnknownHostException e) {
       throw new TTransportException(e);
     }
@@ -193,6 +275,9 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), address);
   }
 
+  /**
+   * Create a Thrift server given the provided and Accumulo configuration.
+   */
   public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName,
       int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout)
       throws TTransportException {
@@ -240,7 +325,12 @@ public class TServerUtils {
     return serverAddress;
   }
 
-  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+  /**
+   * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to forcibly shut down the threadpool.
+   *
+   * @param s
+   *          The TServer to stop
+   */
   public static void stopTServer(TServer s) {
     if (s == null)
       return;


[2/3] accumulo git commit: ACCUMULO-3425 Use slfj4 logger variable substitution.

Posted by el...@apache.org.
ACCUMULO-3425 Use slfj4 logger variable substitution.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0433e037
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0433e037
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0433e037

Branch: refs/heads/master
Commit: 0433e0374a179ba64b94cbdb544ac1f803f620d8
Parents: dfb66c3
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 16 16:56:16 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 16 16:56:16 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 39 ++++++++------------
 .../accumulo/server/rpc/TServerUtils.java       |  8 ++--
 2 files changed, 20 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0433e037/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index c159e8b..7a6e6ab 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -34,9 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.log4j.Logger;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
@@ -53,7 +54,7 @@ public class ThriftTransportPool {
 
   private CountDownLatch closerExitLatch;
 
-  private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
+  private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
 
   private static final Long ERROR_THRESHOLD = 20l;
   private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
@@ -175,9 +176,8 @@ public class ThriftTransportPool {
         lastIoCount = -1;
       } else {
         if ((ioCount & 1) == 1) {
-          // connection unreserved, but it seems io may still be
-          // happening
-          log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
+          // connection unreserved, but it seems io may still be happening
+          log.warn("Connection returned to thrift connection pool that may still be in use {} {}", ioThreadName, Thread.currentThread().getName(),
               new Exception());
         }
 
@@ -204,7 +204,7 @@ public class ThriftTransportPool {
           long delta = System.currentTimeMillis() - ioStartTime;
           if (delta >= threshold && stuckThreadName == null) {
             stuckThreadName = ioThreadName;
-            log.warn("Thread \"" + ioThreadName + "\" stuck on IO  to " + cacheKey + " for at least " + delta + " ms");
+            log.warn("Thread \"{}\" stuck on IO to {} for at least {} ms", ioThreadName, cacheKey, delta);
           }
         } else {
           // remember this ioCount and the time we saw it, need to see
@@ -214,7 +214,7 @@ public class ThriftTransportPool {
 
           if (stuckThreadName != null) {
             // doing I/O, but ioCount changed so no longer stuck
-            log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
+            log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName, cacheKey, sawError);
             stuckThreadName = null;
           }
         }
@@ -222,7 +222,7 @@ public class ThriftTransportPool {
         // I/O is not currently happening
         if (stuckThreadName != null) {
           // no longer stuck, and was stuck in the past
-          log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
+          log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName, cacheKey, sawError);
           stuckThreadName = null;
         }
       }
@@ -406,8 +406,7 @@ public class ThriftTransportPool {
       for (CachedConnection cachedConnection : ccl) {
         if (!cachedConnection.isReserved()) {
           cachedConnection.setReserved(true);
-          if (log.isTraceEnabled())
-            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+          log.trace("Using existing connection to {}:{}", cacheKey.getLocation(), cacheKey.getPort());
           return cachedConnection.transport;
         }
       }
@@ -436,8 +435,7 @@ public class ThriftTransportPool {
             for (CachedConnection cachedConnection : getCache().get(ttk)) {
               if (!cachedConnection.isReserved()) {
                 cachedConnection.setReserved(true);
-                if (log.isTraceEnabled())
-                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+                log.trace("Using existing connection to {}:{}", ttk.getLocation(), ttk.getPort());
                 return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
               }
             }
@@ -458,8 +456,7 @@ public class ThriftTransportPool {
             for (CachedConnection cachedConnection : cachedConnList) {
               if (!cachedConnection.isReserved()) {
                 cachedConnection.setReserved(true);
-                if (log.isTraceEnabled())
-                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout());
+                log.trace("Using existing connection to {}:{} timeout {}", ttk.getLocation(), ttk.getPort(), ttk.getTimeout());
                 return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
               }
             }
@@ -483,8 +480,7 @@ public class ThriftTransportPool {
     TTransport transport = ThriftUtil.createClientTransport(HostAndPort.fromParts(cacheKey.getLocation(), cacheKey.getPort()), (int) cacheKey.getTimeout(),
         cacheKey.getSslParams());
 
-    if (log.isTraceEnabled())
-      log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+    log.trace("Creating new connection to connection to {}:{}", cacheKey.getLocation(), cacheKey.getPort());
 
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 
@@ -528,8 +524,7 @@ public class ThriftTransportPool {
             closeList.add(cachedConnection);
             iterator.remove();
 
-            if (log.isTraceEnabled())
-              log.trace("Returned connection had error " + ctsc.getCacheKey());
+            log.trace("Returned connection had error {}", ctsc.getCacheKey());
 
             Long ecount = errorCount.get(ctsc.getCacheKey());
             if (ecount == null)
@@ -543,16 +538,14 @@ public class ThriftTransportPool {
             }
 
             if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
-              log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
+              log.warn("Server {} had {} failures in a short time period, will not complain anymore", ctsc.getCacheKey(), ecount);
               serversWarnedAbout.add(ctsc.getCacheKey());
             }
 
             cachedConnection.setReserved(false);
 
           } else {
-
-            if (log.isTraceEnabled())
-              log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
+            log.trace("Returned connection {} ioCount: {}", ctsc.getCacheKey(), cachedConnection.transport.ioCount);
 
             cachedConnection.lastReturnTime = System.currentTimeMillis();
             cachedConnection.setReserved(false);
@@ -595,7 +588,7 @@ public class ThriftTransportPool {
    */
   public synchronized void setIdleTime(long time) {
     this.killTime = time;
-    log.debug("Set thrift transport pool idle time to " + time);
+    log.debug("Set thrift transport pool idle time to {}", time);
   }
 
   private static ThriftTransportPool instance = new ThriftTransportPool();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0433e037/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 9a74357..d972b9a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -110,7 +110,7 @@ public class TServerUtils {
             // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
             // TTransportException, and with a TSocket created by TSSLTransportFactory, it
             // comes through as caused by a BindException.
-            log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+            log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
             UtilWaitThread.sleep(250);
           } else {
             // thrift is passing up a nested exception that isn't a BindException,
@@ -145,7 +145,7 @@ public class TServerUtils {
       public void run() {
         if (pool.getCorePoolSize() <= pool.getActiveCount()) {
           int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
-          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+          log.info("Increasing server thread pool size on {} to {}", serverName, larger);
           pool.setMaximumPoolSize(larger);
           pool.setCorePoolSize(larger);
         } else {
@@ -156,7 +156,7 @@ public class TServerUtils {
               // we decrease the core pool size... so the active count could end up higher than
               // the core pool size, in which case everything will be queued... the increase case
               // should handle this and prevent deadlock
-              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+              log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
               pool.setCorePoolSize(smaller);
             }
           }
@@ -251,7 +251,7 @@ public class TServerUtils {
       ExecutorService es = (ExecutorService) f.get(s);
       es.shutdownNow();
     } catch (Exception e) {
-      TServerUtils.log.error("Unable to call shutdownNow", e);
+      log.error("Unable to call shutdownNow", e);
     }
   }
 }