You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/06/21 03:39:51 UTC

[01/28] accumulo git commit: ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Repository: accumulo
Updated Branches:
  refs/heads/1.6 31a222425 -> f66b9a0d1
  refs/heads/1.7 976c0aaf9 -> d3e687e93
  refs/heads/1.8 94bf129c8 -> f45a65cb2
  refs/heads/master 5c576af1e -> c2a3b685c


ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/561f1893
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/561f1893
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/561f1893

Branch: refs/heads/1.6
Commit: 561f1893b9f239457e16c046702d8ba9df1474e2
Parents: d5ea00f
Author: Michiel Vanderlee <ma...@gmail.com>
Authored: Mon Jun 20 18:09:37 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:09:37 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/util/TTimeoutTransport.java   | 24 ++++++++++++++------
 .../apache/accumulo/core/util/ThriftUtil.java   |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
index 1eac8be..52a9a98 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
@@ -66,12 +66,22 @@ public class TTimeoutTransport {
   }
 
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-    Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-    socket.setSoLinger(false, 0);
-    socket.setTcpNoDelay(true);
-    socket.connect(addr);
-    InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-    OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-    return new TIOStreamTransport(input, output);
+    Socket socket = null;
+    try {
+      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket.setSoLinger(false, 0);
+      socket.setTcpNoDelay(true);
+      socket.connect(addr);
+      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
+      throw e;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 29f7fb7..48ff3f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -349,11 +349,17 @@ public class ThriftUtil {
    *          Socket timeout
    */
   private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    SSLSocket socket = null;
     try {
-      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket = (SSLSocket) factory.createSocket(host, port);
       socket.setSoTimeout(timeout);
       return new TSocket(socket);
     } catch (Exception e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
       throw new TTransportException("Could not connect to " + host + " on port " + port, e);
     }
   }


[27/28] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f45a65cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f45a65cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f45a65cb

Branch: refs/heads/master
Commit: f45a65cb241e610351ad5e36302bd59ad87755e7
Parents: af80261 d3e687e
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:33:14 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:33:14 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[12/28] accumulo git commit: Merge branch '1.8'

Posted by el...@apache.org.
Merge branch '1.8'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bfc2a5b7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bfc2a5b7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bfc2a5b7

Branch: refs/heads/master
Commit: bfc2a5b7b2dbb452436074529cbdd1b32ab955a9
Parents: 5c576af a24a286
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:38:32 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:38:32 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 .../accumulo/server/util/TServerUtilsTest.java  | 19 ++++++++++------
 3 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bfc2a5b7/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------


[11/28] accumulo git commit: ACCUMULO-4349 Fix test bind logic to pass regardless of network configuration

Posted by el...@apache.org.
ACCUMULO-4349 Fix test bind logic to pass regardless of network configuration

ServerSocket will bind to all interfaces which can cause the test to fail
when it expects that subsequent attempts to bind the same port will fail.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a24a286d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a24a286d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a24a286d

Branch: refs/heads/master
Commit: a24a286d8e547467403b03cf5297bf1364701594
Parents: 670f2ea
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:16:54 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:16:54 2016 -0400

----------------------------------------------------------------------
 .../accumulo/server/util/TServerUtilsTest.java   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a24a286d/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index e6761a5..458118d 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -232,8 +232,9 @@ public class TServerUtilsTest {
   @Test(expected = UnknownHostException.class)
   public void testStartServerUsedPort() throws Exception {
     int port = getFreePort(1024);
+    InetAddress addr = InetAddress.getByName("localhost");
     // Bind to the port
-    ServerSocket s = new ServerSocket(port);
+    ServerSocket s = new ServerSocket(port, 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, Integer.toString(port));
     try {
       startServer();
@@ -247,7 +248,8 @@ public class TServerUtilsTest {
     TServer server = null;
     int[] port = findTwoFreeSequentialPorts(1024);
     // Bind to the port
-    ServerSocket s = new ServerSocket(port[0]);
+    InetAddress addr = InetAddress.getByName("localhost");
+    ServerSocket s = new ServerSocket(port[0], 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, Integer.toString(port[0]));
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
     try {
@@ -286,10 +288,11 @@ public class TServerUtilsTest {
   @Test
   public void testStartServerPortRangeFirstPortUsed() throws Exception {
     TServer server = null;
+    InetAddress addr = InetAddress.getByName("localhost");
     int[] port = findTwoFreeSequentialPorts(1024);
     String portRange = Integer.toString(port[0]) + "-" + Integer.toString(port[1]);
     // Bind to the port
-    ServerSocket s = new ServerSocket(port[0]);
+    ServerSocket s = new ServerSocket(port[0], 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, portRange);
     try {
       ServerAddress address = startServer();
@@ -305,7 +308,7 @@ public class TServerUtilsTest {
     }
   }
 
-  private int[] findTwoFreeSequentialPorts(int startingAddress) {
+  private int[] findTwoFreeSequentialPorts(int startingAddress) throws UnknownHostException {
     boolean sequential = false;
     int low = startingAddress;
     int high = 0;
@@ -317,10 +320,11 @@ public class TServerUtilsTest {
     return new int[] {low, high};
   }
 
-  private int getFreePort(int startingAddress) {
+  private int getFreePort(int startingAddress) throws UnknownHostException {
+    final InetAddress addr = InetAddress.getByName("localhost");
     for (int i = startingAddress; i < 65535; i++) {
       try {
-        ServerSocket s = new ServerSocket(i);
+        ServerSocket s = new ServerSocket(i, 50, addr);
         int port = s.getLocalPort();
         s.close();
         return port;
@@ -336,7 +340,8 @@ public class TServerUtilsTest {
     ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null, null);
     Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler));
     Processor<Iface> processor = new Processor<Iface>(rpcProxy);
-    String hostname = InetAddress.getLocalHost().getHostName();
+    // "localhost" explicitly to make sure we can always bind to that interface (avoids DNS misconfiguration)
+    String hostname = "localhost";
 
     return TServerUtils.startServer(ctx, hostname, Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread",
         Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);


[02/28] accumulo git commit: ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Posted by el...@apache.org.
ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/561f1893
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/561f1893
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/561f1893

Branch: refs/heads/1.7
Commit: 561f1893b9f239457e16c046702d8ba9df1474e2
Parents: d5ea00f
Author: Michiel Vanderlee <ma...@gmail.com>
Authored: Mon Jun 20 18:09:37 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:09:37 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/util/TTimeoutTransport.java   | 24 ++++++++++++++------
 .../apache/accumulo/core/util/ThriftUtil.java   |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
index 1eac8be..52a9a98 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
@@ -66,12 +66,22 @@ public class TTimeoutTransport {
   }
 
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-    Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-    socket.setSoLinger(false, 0);
-    socket.setTcpNoDelay(true);
-    socket.connect(addr);
-    InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-    OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-    return new TIOStreamTransport(input, output);
+    Socket socket = null;
+    try {
+      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket.setSoLinger(false, 0);
+      socket.setTcpNoDelay(true);
+      socket.connect(addr);
+      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
+      throw e;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 29f7fb7..48ff3f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -349,11 +349,17 @@ public class ThriftUtil {
    *          Socket timeout
    */
   private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    SSLSocket socket = null;
     try {
-      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket = (SSLSocket) factory.createSocket(host, port);
       socket.setSoTimeout(timeout);
       return new TSocket(socket);
     } catch (Exception e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
       throw new TTransportException("Could not connect to " + host + " on port " + port, e);
     }
   }


[08/28] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/670f2ea8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/670f2ea8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/670f2ea8

Branch: refs/heads/1.8
Commit: 670f2ea8ade31746194417f4774b7f522023404f
Parents: 519e5b7 f81a22e
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 18:22:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:22:49 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[10/28] accumulo git commit: ACCUMULO-4349 Fix test bind logic to pass regardless of network configuration

Posted by el...@apache.org.
ACCUMULO-4349 Fix test bind logic to pass regardless of network configuration

ServerSocket will bind to all interfaces which can cause the test to fail
when it expects that subsequent attempts to bind the same port will fail.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a24a286d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a24a286d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a24a286d

Branch: refs/heads/1.8
Commit: a24a286d8e547467403b03cf5297bf1364701594
Parents: 670f2ea
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:16:54 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:16:54 2016 -0400

----------------------------------------------------------------------
 .../accumulo/server/util/TServerUtilsTest.java   | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a24a286d/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index e6761a5..458118d 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -232,8 +232,9 @@ public class TServerUtilsTest {
   @Test(expected = UnknownHostException.class)
   public void testStartServerUsedPort() throws Exception {
     int port = getFreePort(1024);
+    InetAddress addr = InetAddress.getByName("localhost");
     // Bind to the port
-    ServerSocket s = new ServerSocket(port);
+    ServerSocket s = new ServerSocket(port, 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, Integer.toString(port));
     try {
       startServer();
@@ -247,7 +248,8 @@ public class TServerUtilsTest {
     TServer server = null;
     int[] port = findTwoFreeSequentialPorts(1024);
     // Bind to the port
-    ServerSocket s = new ServerSocket(port[0]);
+    InetAddress addr = InetAddress.getByName("localhost");
+    ServerSocket s = new ServerSocket(port[0], 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, Integer.toString(port[0]));
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
     try {
@@ -286,10 +288,11 @@ public class TServerUtilsTest {
   @Test
   public void testStartServerPortRangeFirstPortUsed() throws Exception {
     TServer server = null;
+    InetAddress addr = InetAddress.getByName("localhost");
     int[] port = findTwoFreeSequentialPorts(1024);
     String portRange = Integer.toString(port[0]) + "-" + Integer.toString(port[1]);
     // Bind to the port
-    ServerSocket s = new ServerSocket(port[0]);
+    ServerSocket s = new ServerSocket(port[0], 50, addr);
     ((ConfigurationCopy) factory.getConfiguration()).set(Property.TSERV_CLIENTPORT, portRange);
     try {
       ServerAddress address = startServer();
@@ -305,7 +308,7 @@ public class TServerUtilsTest {
     }
   }
 
-  private int[] findTwoFreeSequentialPorts(int startingAddress) {
+  private int[] findTwoFreeSequentialPorts(int startingAddress) throws UnknownHostException {
     boolean sequential = false;
     int low = startingAddress;
     int high = 0;
@@ -317,10 +320,11 @@ public class TServerUtilsTest {
     return new int[] {low, high};
   }
 
-  private int getFreePort(int startingAddress) {
+  private int getFreePort(int startingAddress) throws UnknownHostException {
+    final InetAddress addr = InetAddress.getByName("localhost");
     for (int i = startingAddress; i < 65535; i++) {
       try {
-        ServerSocket s = new ServerSocket(i);
+        ServerSocket s = new ServerSocket(i, 50, addr);
         int port = s.getLocalPort();
         s.close();
         return port;
@@ -336,7 +340,8 @@ public class TServerUtilsTest {
     ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null, null);
     Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler));
     Processor<Iface> processor = new Processor<Iface>(rpcProxy);
-    String hostname = InetAddress.getLocalHost().getHostName();
+    // "localhost" explicitly to make sure we can always bind to that interface (avoids DNS misconfiguration)
+    String hostname = "localhost";
 
     return TServerUtils.startServer(ctx, hostname, Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread",
         Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);


[25/28] accumulo git commit: Merge remote-tracking branch 'origin/1.8' into 1.8

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.8' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/af802612
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/af802612
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/af802612

Branch: refs/heads/master
Commit: af802612965e903a7bfa034bc2afaa02e21ad735
Parents: a24a286 94bf129
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:33:11 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:33:11 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/conf/PropertyType.java |  3 -
 .../core/metadata/MetadataLocationObtainer.java | 21 ++++---
 .../core/client/impl/ScannerOptionsTest.java    | 58 +++++++++++---------
 pom.xml                                         |  2 +-
 .../accumulo/server/rpc/TServerUtils.java       |  4 +-
 .../BaseHostRegexTableLoadBalancerTest.java     |  7 ++-
 .../accumulo/server/util/TServerUtilsTest.java  |  5 ++
 7 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/af802612/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------


[05/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f81a22ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f81a22ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f81a22ec

Branch: refs/heads/1.8
Commit: f81a22ecb05e079eeb558705fd95740c9253c79e
Parents: 7b92053 561f189
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 18:22:41 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:22:41 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index 400d90a,0000000..cc3f51b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@@ -1,77 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.SocketAddress;
 +import java.nio.channels.spi.SelectorProvider;
 +
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.thrift.transport.TIOStreamTransport;
 +import org.apache.thrift.transport.TTransport;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class TTimeoutTransport {
 +
 +  private static volatile Method GET_INPUT_STREAM_METHOD = null;
 +
 +  private static Method getNetUtilsInputStreamMethod() {
 +    if (null == GET_INPUT_STREAM_METHOD) {
 +      synchronized (TTimeoutTransport.class) {
 +        if (null == GET_INPUT_STREAM_METHOD) {
 +          try {
 +            GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
 +          } catch (Exception e) {
 +            throw new RuntimeException(e);
 +          }
 +        }
 +      }
 +    }
 +
 +    return GET_INPUT_STREAM_METHOD;
 +  }
 +
 +  private static InputStream getInputStream(Socket socket, long timeout) {
 +    try {
 +      return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
 +    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
 +  }
 +
 +  public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-     Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-     socket.setSoLinger(false, 0);
-     socket.setTcpNoDelay(true);
-     socket.connect(addr);
-     InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-     OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-     return new TIOStreamTransport(input, output);
++    Socket socket = null;
++    try {
++      socket = SelectorProvider.provider().openSocketChannel().socket();
++      socket.setSoLinger(false, 0);
++      socket.setTcpNoDelay(true);
++      socket.connect(addr);
++      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
++      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
++      return new TIOStreamTransport(input, output);
++    } catch (IOException e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
++      throw e;
++    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index f3cb9b5,0000000..be4238e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@@ -1,457 -1,0 +1,463 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.security.KeyStore;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Random;
 +
 +import javax.net.ssl.KeyManagerFactory;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.SSLSocket;
 +import javax.net.ssl.SSLSocketFactory;
 +import javax.net.ssl.TrustManagerFactory;
 +
 +import org.apache.accumulo.core.client.impl.ClientContext;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.TServiceClientFactory;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.transport.TFramedTransport;
 +import org.apache.thrift.transport.TSSLTransportFactory;
 +import org.apache.thrift.transport.TSaslClientTransport;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Factory methods for creating Thrift client objects
 + */
 +public class ThriftUtil {
 +  private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
 +
 +  private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
 +  private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
 +  private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 +
 +  public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5";
 +
 +  private static final Random SASL_BACKOFF_RAND = new Random();
 +  private static final int RELOGIN_MAX_BACKOFF = 5000;
 +
 +  /**
 +   * An instance of {@link TraceProtocolFactory}
 +   *
 +   * @return The default Thrift TProtocolFactory for RPC
 +   */
 +  public static TProtocolFactory protocolFactory() {
 +    return protocolFactory;
 +  }
 +
 +  /**
 +   * An instance of {@link org.apache.thrift.transport.TFramedTransport.Factory}
 +   *
 +   * @return The default Thrift TTransportFactory for RPC
 +   */
 +  public static TTransportFactory transportFactory() {
 +    return transportFactory;
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory and transport
 +   */
 +  public static <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
 +    return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address, and client context with no timeout.
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
 +      throws TTransportException {
 +    return getClient(factory, address, context, 0);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context. Client timeout is extracted from the
 +   * ClientContext
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available) using the address, client context and timeou
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout)
 +      throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Return the transport used by the client to the shared pool.
 +   *
 +   * @param iface
 +   *          The Client being returned or null.
 +   */
 +  public static void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
 +    if (iface != null) {
 +      ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
 +    }
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context);
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          Options for connecting to the server
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context, long timeout) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
 +  }
 +
 +  /**
 +   * Create a transport that is not pooled
 +   *
 +   * @param address
 +   *          Server address to open the transport to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
 +    return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams(), context.getSaslParams());
 +  }
 +
 +  /**
 +   * Get an instance of the TTransportFactory with the provided maximum frame size
 +   *
 +   * @param maxFrameSize
 +   *          Maximum Thrift message frame size
 +   * @return A, possibly cached, TTransportFactory with the requested maximum frame size
 +   */
 +  public static synchronized TTransportFactory transportFactory(int maxFrameSize) {
 +    TTransportFactory factory = factoryCache.get(maxFrameSize);
 +    if (factory == null) {
 +      factory = new TFramedTransport.Factory(maxFrameSize);
 +      factoryCache.put(maxFrameSize, factory);
 +    }
 +    return factory;
 +  }
 +
 +  /**
 +   * @see #transportFactory(int)
 +   */
 +  public static synchronized TTransportFactory transportFactory(long maxFrameSize) {
 +    if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
 +      throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
 +    return transportFactory((int) maxFrameSize);
 +  }
 +
 +  /**
 +   * Create a TTransport for clients to the given address with the provided socket timeout and session-layer configuration
 +   *
 +   * @param address
 +   *          Server address to connect to
 +   * @param timeout
 +   *          Client socket timeout
 +   * @param sslParams
 +   *          RPC options for SSL servers
 +   * @param saslParams
 +   *          RPC options for SASL servers
 +   * @return An open TTransport which must be closed when finished
 +   */
 +  public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams)
 +      throws TTransportException {
 +    boolean success = false;
 +    TTransport transport = null;
 +    try {
 +      if (sslParams != null) {
 +        // The check in AccumuloServerContext ensures that servers are brought up with sane configurations, but we also want to validate clients
 +        if (null != saslParams) {
 +          throw new IllegalStateException("Cannot use both SSL and SASL");
 +        }
 +
 +        log.trace("Creating SSL client transport");
 +
 +        // TSSLTransportFactory handles timeout 0 -> forever natively
 +        if (sslParams.useJsse()) {
 +          transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
 +        } else {
 +          // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
 +          // to do some magic to make sure that happens. Not an issue in JDK7
 +
 +          // Taken from thrift-0.9.1 to make the SSLContext
 +          SSLContext sslContext = createSSLContext(sslParams);
 +
 +          // Create the factory from it
 +          SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
 +
 +          // Wrap the real factory with our own that will set the protocol on the Socket before returning it
 +          ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory,
 +              new String[] {sslParams.getClientProtocol()});
 +
 +          // Create the TSocket from that
 +          transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
 +          // TSSLTransportFactory leaves transports open, so no need to open here
 +        }
 +
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      } else if (null != saslParams) {
 +        if (!UserGroupInformation.isSecurityEnabled()) {
 +          throw new IllegalStateException("Expected Kerberos security to be enabled if SASL is in use");
 +        }
 +
 +        log.trace("Creating SASL connection to {}:{}", address.getHostText(), address.getPort());
 +
 +        // Make sure a timeout is set
 +        try {
 +          transport = TTimeoutTransport.create(address, timeout);
 +        } catch (IOException e) {
 +          log.warn("Failed to open transport to {}", address);
 +          throw new TTransportException(e);
 +        }
 +
 +        try {
 +          // Log in via UGI, ensures we have logged in with our KRB credentials
 +          final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +
 +          // Is this pricey enough that we want to cache it?
 +          final String hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
 +
 +          final SaslMechanism mechanism = saslParams.getMechanism();
 +
 +          log.trace("Opening transport to server as {} to {}/{} using {}", currentUser, saslParams.getKerberosServerPrimary(), hostname, mechanism);
 +
 +          // Create the client SASL transport using the information for the server
 +          // Despite the 'protocol' argument seeming to be useless, it *must* be the primary of the server being connected to
 +          transport = new TSaslClientTransport(mechanism.getMechanismName(), null, saslParams.getKerberosServerPrimary(), hostname,
 +              saslParams.getSaslProperties(), saslParams.getCallbackHandler(), transport);
 +
 +          // Wrap it all in a processor which will run with a doAs the current user
 +          transport = new UGIAssumingTransport(transport, currentUser);
 +
 +          // Open the transport
 +          transport.open();
 +        } catch (TTransportException e) {
 +          log.warn("Failed to open SASL transport", e);
 +
 +          // We might have had a valid ticket, but it expired. We'll let the caller retry, but we will attempt to re-login to make the next attempt work.
 +          // Sadly, we have no way to determine the actual reason we got this TTransportException other than inspecting the exception msg.
 +          log.debug("Caught TTransportException opening SASL transport, checking if re-login is necessary before propagating the exception.");
 +          attemptClientReLogin();
 +
 +          throw e;
 +        } catch (IOException e) {
 +          log.warn("Failed to open SASL transport", e);
 +          throw new TTransportException(e);
 +        }
 +      } else {
 +        log.trace("Opening normal transport");
 +        if (timeout == 0) {
 +          transport = new TSocket(address.getHostText(), address.getPort());
 +          transport.open();
 +        } else {
 +          try {
 +            transport = TTimeoutTransport.create(address, timeout);
 +          } catch (IOException ex) {
 +            log.warn("Failed to open transport to " + address);
 +            throw new TTransportException(ex);
 +          }
 +
 +          // Open the transport
 +          transport.open();
 +        }
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      }
 +      success = true;
 +    } finally {
 +      if (!success && transport != null) {
 +        transport.close();
 +      }
 +    }
 +    return transport;
 +  }
 +
 +  /**
 +   * Some wonderful snippets of documentation from HBase on performing the re-login client-side (as well as server-side) in the following paragraph. We want to
 +   * attempt a re-login to automatically refresh the client's Krb "credentials" (remember, a server might also be a client, master sending RPC to tserver), but
 +   * we have to take care to avoid Kerberos' replay attack protection.
 +   * <p>
 +   * If multiple clients with the same principal try to connect to the same server at the same time, the server assumes a replay attack is in progress. This is
 +   * a feature of kerberos. In order to work around this, what is done is that the client backs off randomly and tries to initiate the connection again. The
 +   * other problem is to do with ticket expiry. To handle that, a relogin is attempted.
 +   */
 +  static void attemptClientReLogin() {
 +    try {
 +      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
 +      if (null == loginUser || !loginUser.hasKerberosCredentials()) {
 +        // We should have already checked that we're logged in and have credentials. A precondition-like check.
 +        throw new RuntimeException("Expected to find Kerberos UGI credentials, but did not");
 +      }
 +      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +      // A Proxy user is the "effective user" (in name only), riding on top of the "real user"'s Krb credentials.
 +      UserGroupInformation realUser = currentUser.getRealUser();
 +
 +      // re-login only in case it is the login user or superuser.
 +      if (loginUser.equals(currentUser) || loginUser.equals(realUser)) {
 +        if (UserGroupInformation.isLoginKeytabBased()) {
 +          log.info("Performing keytab-based Kerberos re-login");
 +          loginUser.reloginFromKeytab();
 +        } else {
 +          log.info("Performing ticket-cache-based Kerberos re-login");
 +          loginUser.reloginFromTicketCache();
 +        }
 +
 +        // Avoid the replay attack protection, sleep 1 to 5000ms
 +        try {
 +          Thread.sleep((SASL_BACKOFF_RAND.nextInt(RELOGIN_MAX_BACKOFF) + 1));
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          return;
 +        }
 +      } else {
 +        log.debug("Not attempting Kerberos re-login: loginUser={}, currentUser={}, realUser={}", loginUser, currentUser, realUser);
 +      }
 +    } catch (IOException e) {
 +      // The inability to check is worrisome and deserves a RuntimeException instead of a propagated IO-like Exception.
 +      log.warn("Failed to check (and/or perform) Kerberos client re-login", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  /**
 +   * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
 +   * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
 +   *
 +   * @param params
 +   *          Parameters to use to create the SSLContext
 +   */
 +  private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException {
 +    SSLContext ctx;
 +    try {
 +      ctx = SSLContext.getInstance(params.getClientProtocol());
 +      TrustManagerFactory tmf = null;
 +      KeyManagerFactory kmf = null;
 +
 +      if (params.isTrustStoreSet()) {
 +        tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
 +        KeyStore ts = KeyStore.getInstance(params.getTrustStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getTrustStorePath())) {
 +          ts.load(fis, params.getTrustStorePass().toCharArray());
 +        }
 +        tmf.init(ts);
 +      }
 +
 +      if (params.isKeyStoreSet()) {
 +        kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
 +        KeyStore ks = KeyStore.getInstance(params.getKeyStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getKeyStorePath())) {
 +          ks.load(fis, params.getKeyStorePass().toCharArray());
 +        }
 +        kmf.init(ks, params.getKeyStorePass().toCharArray());
 +      }
 +
 +      if (params.isKeyStoreSet() && params.isTrustStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 +      } else if (params.isKeyStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), null, null);
 +      } else {
 +        ctx.init(null, tmf.getTrustManagers(), null);
 +      }
 +
 +    } catch (Exception e) {
 +      throw new TTransportException("Error creating the transport", e);
 +    }
 +    return ctx;
 +  }
 +
 +  /**
 +   * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout.
 +   *
 +   * @param factory
 +   *          Factory to create the socket from
 +   * @param host
 +   *          Destination host
 +   * @param port
 +   *          Destination port
 +   * @param timeout
 +   *          Socket timeout
 +   */
 +  private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
++    SSLSocket socket = null;
 +    try {
-       SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
++      socket = (SSLSocket) factory.createSocket(host, port);
 +      socket.setSoTimeout(timeout);
 +      return new TSocket(socket);
 +    } catch (Exception e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
 +      throw new TTransportException("Could not connect to " + host + " on port " + port, e);
 +    }
 +  }
 +}


[07/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f81a22ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f81a22ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f81a22ec

Branch: refs/heads/master
Commit: f81a22ecb05e079eeb558705fd95740c9253c79e
Parents: 7b92053 561f189
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 18:22:41 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:22:41 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index 400d90a,0000000..cc3f51b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@@ -1,77 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.SocketAddress;
 +import java.nio.channels.spi.SelectorProvider;
 +
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.thrift.transport.TIOStreamTransport;
 +import org.apache.thrift.transport.TTransport;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class TTimeoutTransport {
 +
 +  private static volatile Method GET_INPUT_STREAM_METHOD = null;
 +
 +  private static Method getNetUtilsInputStreamMethod() {
 +    if (null == GET_INPUT_STREAM_METHOD) {
 +      synchronized (TTimeoutTransport.class) {
 +        if (null == GET_INPUT_STREAM_METHOD) {
 +          try {
 +            GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
 +          } catch (Exception e) {
 +            throw new RuntimeException(e);
 +          }
 +        }
 +      }
 +    }
 +
 +    return GET_INPUT_STREAM_METHOD;
 +  }
 +
 +  private static InputStream getInputStream(Socket socket, long timeout) {
 +    try {
 +      return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
 +    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
 +  }
 +
 +  public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-     Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-     socket.setSoLinger(false, 0);
-     socket.setTcpNoDelay(true);
-     socket.connect(addr);
-     InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-     OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-     return new TIOStreamTransport(input, output);
++    Socket socket = null;
++    try {
++      socket = SelectorProvider.provider().openSocketChannel().socket();
++      socket.setSoLinger(false, 0);
++      socket.setTcpNoDelay(true);
++      socket.connect(addr);
++      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
++      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
++      return new TIOStreamTransport(input, output);
++    } catch (IOException e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
++      throw e;
++    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index f3cb9b5,0000000..be4238e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@@ -1,457 -1,0 +1,463 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.security.KeyStore;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Random;
 +
 +import javax.net.ssl.KeyManagerFactory;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.SSLSocket;
 +import javax.net.ssl.SSLSocketFactory;
 +import javax.net.ssl.TrustManagerFactory;
 +
 +import org.apache.accumulo.core.client.impl.ClientContext;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.TServiceClientFactory;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.transport.TFramedTransport;
 +import org.apache.thrift.transport.TSSLTransportFactory;
 +import org.apache.thrift.transport.TSaslClientTransport;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Factory methods for creating Thrift client objects
 + */
 +public class ThriftUtil {
 +  private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
 +
 +  private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
 +  private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
 +  private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 +
 +  public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5";
 +
 +  private static final Random SASL_BACKOFF_RAND = new Random();
 +  private static final int RELOGIN_MAX_BACKOFF = 5000;
 +
 +  /**
 +   * An instance of {@link TraceProtocolFactory}
 +   *
 +   * @return The default Thrift TProtocolFactory for RPC
 +   */
 +  public static TProtocolFactory protocolFactory() {
 +    return protocolFactory;
 +  }
 +
 +  /**
 +   * An instance of {@link org.apache.thrift.transport.TFramedTransport.Factory}
 +   *
 +   * @return The default Thrift TTransportFactory for RPC
 +   */
 +  public static TTransportFactory transportFactory() {
 +    return transportFactory;
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory and transport
 +   */
 +  public static <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
 +    return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address, and client context with no timeout.
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
 +      throws TTransportException {
 +    return getClient(factory, address, context, 0);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context. Client timeout is extracted from the
 +   * ClientContext
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available) using the address, client context and timeou
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout)
 +      throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Return the transport used by the client to the shared pool.
 +   *
 +   * @param iface
 +   *          The Client being returned or null.
 +   */
 +  public static void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
 +    if (iface != null) {
 +      ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
 +    }
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context);
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          Options for connecting to the server
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context, long timeout) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
 +  }
 +
 +  /**
 +   * Create a transport that is not pooled
 +   *
 +   * @param address
 +   *          Server address to open the transport to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
 +    return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams(), context.getSaslParams());
 +  }
 +
 +  /**
 +   * Get an instance of the TTransportFactory with the provided maximum frame size
 +   *
 +   * @param maxFrameSize
 +   *          Maximum Thrift message frame size
 +   * @return A, possibly cached, TTransportFactory with the requested maximum frame size
 +   */
 +  public static synchronized TTransportFactory transportFactory(int maxFrameSize) {
 +    TTransportFactory factory = factoryCache.get(maxFrameSize);
 +    if (factory == null) {
 +      factory = new TFramedTransport.Factory(maxFrameSize);
 +      factoryCache.put(maxFrameSize, factory);
 +    }
 +    return factory;
 +  }
 +
 +  /**
 +   * @see #transportFactory(int)
 +   */
 +  public static synchronized TTransportFactory transportFactory(long maxFrameSize) {
 +    if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
 +      throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
 +    return transportFactory((int) maxFrameSize);
 +  }
 +
 +  /**
 +   * Create a TTransport for clients to the given address with the provided socket timeout and session-layer configuration
 +   *
 +   * @param address
 +   *          Server address to connect to
 +   * @param timeout
 +   *          Client socket timeout
 +   * @param sslParams
 +   *          RPC options for SSL servers
 +   * @param saslParams
 +   *          RPC options for SASL servers
 +   * @return An open TTransport which must be closed when finished
 +   */
 +  public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams)
 +      throws TTransportException {
 +    boolean success = false;
 +    TTransport transport = null;
 +    try {
 +      if (sslParams != null) {
 +        // The check in AccumuloServerContext ensures that servers are brought up with sane configurations, but we also want to validate clients
 +        if (null != saslParams) {
 +          throw new IllegalStateException("Cannot use both SSL and SASL");
 +        }
 +
 +        log.trace("Creating SSL client transport");
 +
 +        // TSSLTransportFactory handles timeout 0 -> forever natively
 +        if (sslParams.useJsse()) {
 +          transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
 +        } else {
 +          // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
 +          // to do some magic to make sure that happens. Not an issue in JDK7
 +
 +          // Taken from thrift-0.9.1 to make the SSLContext
 +          SSLContext sslContext = createSSLContext(sslParams);
 +
 +          // Create the factory from it
 +          SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
 +
 +          // Wrap the real factory with our own that will set the protocol on the Socket before returning it
 +          ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory,
 +              new String[] {sslParams.getClientProtocol()});
 +
 +          // Create the TSocket from that
 +          transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
 +          // TSSLTransportFactory leaves transports open, so no need to open here
 +        }
 +
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      } else if (null != saslParams) {
 +        if (!UserGroupInformation.isSecurityEnabled()) {
 +          throw new IllegalStateException("Expected Kerberos security to be enabled if SASL is in use");
 +        }
 +
 +        log.trace("Creating SASL connection to {}:{}", address.getHostText(), address.getPort());
 +
 +        // Make sure a timeout is set
 +        try {
 +          transport = TTimeoutTransport.create(address, timeout);
 +        } catch (IOException e) {
 +          log.warn("Failed to open transport to {}", address);
 +          throw new TTransportException(e);
 +        }
 +
 +        try {
 +          // Log in via UGI, ensures we have logged in with our KRB credentials
 +          final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +
 +          // Is this pricey enough that we want to cache it?
 +          final String hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
 +
 +          final SaslMechanism mechanism = saslParams.getMechanism();
 +
 +          log.trace("Opening transport to server as {} to {}/{} using {}", currentUser, saslParams.getKerberosServerPrimary(), hostname, mechanism);
 +
 +          // Create the client SASL transport using the information for the server
 +          // Despite the 'protocol' argument seeming to be useless, it *must* be the primary of the server being connected to
 +          transport = new TSaslClientTransport(mechanism.getMechanismName(), null, saslParams.getKerberosServerPrimary(), hostname,
 +              saslParams.getSaslProperties(), saslParams.getCallbackHandler(), transport);
 +
 +          // Wrap it all in a processor which will run with a doAs the current user
 +          transport = new UGIAssumingTransport(transport, currentUser);
 +
 +          // Open the transport
 +          transport.open();
 +        } catch (TTransportException e) {
 +          log.warn("Failed to open SASL transport", e);
 +
 +          // We might have had a valid ticket, but it expired. We'll let the caller retry, but we will attempt to re-login to make the next attempt work.
 +          // Sadly, we have no way to determine the actual reason we got this TTransportException other than inspecting the exception msg.
 +          log.debug("Caught TTransportException opening SASL transport, checking if re-login is necessary before propagating the exception.");
 +          attemptClientReLogin();
 +
 +          throw e;
 +        } catch (IOException e) {
 +          log.warn("Failed to open SASL transport", e);
 +          throw new TTransportException(e);
 +        }
 +      } else {
 +        log.trace("Opening normal transport");
 +        if (timeout == 0) {
 +          transport = new TSocket(address.getHostText(), address.getPort());
 +          transport.open();
 +        } else {
 +          try {
 +            transport = TTimeoutTransport.create(address, timeout);
 +          } catch (IOException ex) {
 +            log.warn("Failed to open transport to " + address);
 +            throw new TTransportException(ex);
 +          }
 +
 +          // Open the transport
 +          transport.open();
 +        }
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      }
 +      success = true;
 +    } finally {
 +      if (!success && transport != null) {
 +        transport.close();
 +      }
 +    }
 +    return transport;
 +  }
 +
 +  /**
 +   * Some wonderful snippets of documentation from HBase on performing the re-login client-side (as well as server-side) in the following paragraph. We want to
 +   * attempt a re-login to automatically refresh the client's Krb "credentials" (remember, a server might also be a client, master sending RPC to tserver), but
 +   * we have to take care to avoid Kerberos' replay attack protection.
 +   * <p>
 +   * If multiple clients with the same principal try to connect to the same server at the same time, the server assumes a replay attack is in progress. This is
 +   * a feature of kerberos. In order to work around this, what is done is that the client backs off randomly and tries to initiate the connection again. The
 +   * other problem is to do with ticket expiry. To handle that, a relogin is attempted.
 +   */
 +  static void attemptClientReLogin() {
 +    try {
 +      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
 +      if (null == loginUser || !loginUser.hasKerberosCredentials()) {
 +        // We should have already checked that we're logged in and have credentials. A precondition-like check.
 +        throw new RuntimeException("Expected to find Kerberos UGI credentials, but did not");
 +      }
 +      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +      // A Proxy user is the "effective user" (in name only), riding on top of the "real user"'s Krb credentials.
 +      UserGroupInformation realUser = currentUser.getRealUser();
 +
 +      // re-login only in case it is the login user or superuser.
 +      if (loginUser.equals(currentUser) || loginUser.equals(realUser)) {
 +        if (UserGroupInformation.isLoginKeytabBased()) {
 +          log.info("Performing keytab-based Kerberos re-login");
 +          loginUser.reloginFromKeytab();
 +        } else {
 +          log.info("Performing ticket-cache-based Kerberos re-login");
 +          loginUser.reloginFromTicketCache();
 +        }
 +
 +        // Avoid the replay attack protection, sleep 1 to 5000ms
 +        try {
 +          Thread.sleep((SASL_BACKOFF_RAND.nextInt(RELOGIN_MAX_BACKOFF) + 1));
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          return;
 +        }
 +      } else {
 +        log.debug("Not attempting Kerberos re-login: loginUser={}, currentUser={}, realUser={}", loginUser, currentUser, realUser);
 +      }
 +    } catch (IOException e) {
 +      // The inability to check is worrisome and deserves a RuntimeException instead of a propagated IO-like Exception.
 +      log.warn("Failed to check (and/or perform) Kerberos client re-login", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  /**
 +   * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
 +   * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
 +   *
 +   * @param params
 +   *          Parameters to use to create the SSLContext
 +   */
 +  private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException {
 +    SSLContext ctx;
 +    try {
 +      ctx = SSLContext.getInstance(params.getClientProtocol());
 +      TrustManagerFactory tmf = null;
 +      KeyManagerFactory kmf = null;
 +
 +      if (params.isTrustStoreSet()) {
 +        tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
 +        KeyStore ts = KeyStore.getInstance(params.getTrustStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getTrustStorePath())) {
 +          ts.load(fis, params.getTrustStorePass().toCharArray());
 +        }
 +        tmf.init(ts);
 +      }
 +
 +      if (params.isKeyStoreSet()) {
 +        kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
 +        KeyStore ks = KeyStore.getInstance(params.getKeyStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getKeyStorePath())) {
 +          ks.load(fis, params.getKeyStorePass().toCharArray());
 +        }
 +        kmf.init(ks, params.getKeyStorePass().toCharArray());
 +      }
 +
 +      if (params.isKeyStoreSet() && params.isTrustStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 +      } else if (params.isKeyStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), null, null);
 +      } else {
 +        ctx.init(null, tmf.getTrustManagers(), null);
 +      }
 +
 +    } catch (Exception e) {
 +      throw new TTransportException("Error creating the transport", e);
 +    }
 +    return ctx;
 +  }
 +
 +  /**
 +   * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout.
 +   *
 +   * @param factory
 +   *          Factory to create the socket from
 +   * @param host
 +   *          Destination host
 +   * @param port
 +   *          Destination port
 +   * @param timeout
 +   *          Socket timeout
 +   */
 +  private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
++    SSLSocket socket = null;
 +    try {
-       SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
++      socket = (SSLSocket) factory.createSocket(host, port);
 +      socket.setSoTimeout(timeout);
 +      return new TSocket(socket);
 +    } catch (Exception e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
 +      throw new TTransportException("Could not connect to " + host + " on port " + port, e);
 +    }
 +  }
 +}


[24/28] accumulo git commit: Merge remote-tracking branch 'origin/1.8' into 1.8

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.8' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/af802612
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/af802612
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/af802612

Branch: refs/heads/1.8
Commit: af802612965e903a7bfa034bc2afaa02e21ad735
Parents: a24a286 94bf129
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:33:11 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:33:11 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/conf/PropertyType.java |  3 -
 .../core/metadata/MetadataLocationObtainer.java | 21 ++++---
 .../core/client/impl/ScannerOptionsTest.java    | 58 +++++++++++---------
 pom.xml                                         |  2 +-
 .../accumulo/server/rpc/TServerUtils.java       |  4 +-
 .../BaseHostRegexTableLoadBalancerTest.java     |  7 ++-
 .../accumulo/server/util/TServerUtilsTest.java  |  5 ++
 7 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/af802612/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------


[20/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7

Conflicts:
	server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78a56a22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78a56a22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78a56a22

Branch: refs/heads/1.7
Commit: 78a56a22c92d1738961022646677b76b126e955c
Parents: f81a22e f66b9a0
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:18 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:18 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                     | 2 +-
 .../apache/accumulo/gc/GarbageCollectWriteAheadLogs.java    | 9 ++++-----
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 20484ce,9b67614..27bed62
--- a/pom.xml
+++ b/pom.xml
@@@ -159,14 -156,9 +159,14 @@@
          <version>1.32</version>
        </dependency>
        <dependency>
 +        <groupId>com.google.auto.service</groupId>
 +        <artifactId>auto-service</artifactId>
 +        <version>1.0-rc2</version>
 +      </dependency>
 +      <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
-         <version>2.2.2</version>
+         <version>2.2.4</version>
        </dependency>
        <!-- Hadoop-2.4.0 MiniDFSCluster uses classes dropped in Guava 15 -->
        <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a62ffb2,a22a34e..b57b8fc
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -28,63 -27,44 +27,63 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.AddressUtil;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.ServerConstants;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.instrument.Tracer;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.log4j.Logger;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
  import com.google.common.net.HostAndPort;
 +import com.google.protobuf.InvalidProtocolBufferException;
- import java.util.concurrent.TimeUnit;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
- import org.apache.accumulo.core.conf.Property;
  
  public class GarbageCollectWriteAheadLogs {
 -  private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
  
 -  private final Instance instance;
 +  private final AccumuloServerContext context;
    private final VolumeManager fs;
    private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
-   private AccumuloConfiguration config;
  
    private boolean useTrash;
  


[18/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7

Conflicts:
	server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78a56a22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78a56a22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78a56a22

Branch: refs/heads/1.8
Commit: 78a56a22c92d1738961022646677b76b126e955c
Parents: f81a22e f66b9a0
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:18 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:18 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                     | 2 +-
 .../apache/accumulo/gc/GarbageCollectWriteAheadLogs.java    | 9 ++++-----
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 20484ce,9b67614..27bed62
--- a/pom.xml
+++ b/pom.xml
@@@ -159,14 -156,9 +159,14 @@@
          <version>1.32</version>
        </dependency>
        <dependency>
 +        <groupId>com.google.auto.service</groupId>
 +        <artifactId>auto-service</artifactId>
 +        <version>1.0-rc2</version>
 +      </dependency>
 +      <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
-         <version>2.2.2</version>
+         <version>2.2.4</version>
        </dependency>
        <!-- Hadoop-2.4.0 MiniDFSCluster uses classes dropped in Guava 15 -->
        <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a62ffb2,a22a34e..b57b8fc
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -28,63 -27,44 +27,63 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.AddressUtil;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.ServerConstants;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.instrument.Tracer;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.log4j.Logger;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
  import com.google.common.net.HostAndPort;
 +import com.google.protobuf.InvalidProtocolBufferException;
- import java.util.concurrent.TimeUnit;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
- import org.apache.accumulo.core.conf.Property;
  
  public class GarbageCollectWriteAheadLogs {
 -  private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
  
 -  private final Instance instance;
 +  private final AccumuloServerContext context;
    private final VolumeManager fs;
    private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
-   private AccumuloConfiguration config;
  
    private boolean useTrash;
  


[14/28] accumulo git commit: Merge remote-tracking branch 'origin/1.6' into 1.6

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.6' into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f66b9a0d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f66b9a0d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f66b9a0d

Branch: refs/heads/1.8
Commit: f66b9a0d18cde88bdc269421527ebc5a35445dda
Parents: 561f189 31a2224
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:28:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:28:49 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                       | 2 +-
 .../org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java  | 7 +++----
 2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[13/28] accumulo git commit: ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it

Posted by el...@apache.org.
ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7bd9c088
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7bd9c088
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7bd9c088

Branch: refs/heads/master
Commit: 7bd9c0882efec7d42b83870b3cc3436471359472
Parents: bfc2a5b
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:44:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:44:55 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 146 ++++++++++++++++--
 .../core/rpc/TTimeoutTransportTest.java         | 151 +++++++++++++++++++
 2 files changed, 286 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index cc3f51b..809975f 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -30,16 +30,31 @@ import java.nio.channels.spi.SelectorProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
+/**
+ * A utility class for setting up a {@link TTransport} with various necessary configurations for ideal performance in Accumulo. These configurations include:
+ * <ul>
+ * <li>Setting SO_LINGER=false on the socket.</li>
+ * <li>Setting TCP_NO_DELAY=true on the socket.</li>
+ * <li>Setting timeouts on the I/OStreams.</li>
+ * </ul>
+ */
 public class TTimeoutTransport {
+  private static final Logger log = LoggerFactory.getLogger(TTimeoutTransport.class);
+
+  private static final TTimeoutTransport INSTANCE = new TTimeoutTransport();
+
+  private volatile Method GET_INPUT_STREAM_METHOD = null;
 
-  private static volatile Method GET_INPUT_STREAM_METHOD = null;
+  private TTimeoutTransport() {}
 
-  private static Method getNetUtilsInputStreamMethod() {
+  private Method getNetUtilsInputStreamMethod() {
     if (null == GET_INPUT_STREAM_METHOD) {
-      synchronized (TTimeoutTransport.class) {
+      synchronized (this) {
         if (null == GET_INPUT_STREAM_METHOD) {
           try {
             GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
@@ -53,35 +68,144 @@ public class TTimeoutTransport {
     return GET_INPUT_STREAM_METHOD;
   }
 
-  private static InputStream getInputStream(Socket socket, long timeout) {
+  /**
+   * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle compatibility with both Hadoop 1 and 2.
+   *
+   * @param socket
+   *          The socket to create the input stream on
+   * @param timeout
+   *          The timeout for the input stream in milliseconds
+   * @return An InputStream on the socket
+   */
+  private InputStream getInputStream(Socket socket, long timeout) throws IOException {
     try {
       return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      Throwable cause = e.getCause();
+      // Try to re-throw the IOException directly
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+
+      if (e instanceof RuntimeException) {
+        // Don't re-wrap another RTE around an RTE
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(e);
+      }
     }
   }
 
+  /**
+   * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+   *
+   * @param addr
+   *          The address to connect the client to
+   * @param timeoutMillis
+   *          The timeout in milliseconds for the connection
+   * @return A TTransport connected to the given <code>addr</code>
+   * @throws IOException
+   *           If the transport fails to be created/connected
+   */
   public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
-    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
+    return INSTANCE.createInternal(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
   }
 
+  /**
+   * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+   *
+   * @param addr
+   *          The address to connect the client to
+   * @param timeoutMillis
+   *          The timeout in milliseconds for the connection
+   * @return A TTransport connected to the given <code>addr</code>
+   * @throws IOException
+   *           If the transport fails to be created/connected
+   */
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
+    return INSTANCE.createInternal(addr, timeoutMillis);
+  }
+
+  /**
+   * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift transport using the socket.
+   *
+   * @param addr
+   *          The address the socket should connect
+   * @param timeoutMillis
+   *          The socket timeout in milliseconds
+   * @return A TTransport instance to the given <code>addr</code>
+   * @throws IOException
+   *           If the Thrift client is failed to be connected/created
+   */
+  protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
     Socket socket = null;
     try {
-      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket = openSocket(addr);
+    } catch (IOException e) {
+      // openSocket handles closing the Socket on error
+      throw e;
+    }
+
+    // Should be non-null
+    assert null != socket;
+
+    // Set up the streams
+    try {
+      InputStream input = wrapInputStream(socket, timeoutMillis);
+      OutputStream output = wrapOutputStream(socket, timeoutMillis);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        socket.close();
+      } catch (IOException ioe) {
+        log.error("Failed to close socket after unsuccessful I/O stream setup", e);
+      }
+
+      throw e;
+    }
+  }
+
+  // Visible for testing
+  protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
+    return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+  }
+
+  // Visible for testing
+  protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
+    return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+  }
+
+  /**
+   * Opens and configures a {@link Socket} for Accumulo RPC.
+   *
+   * @param addr
+   *          The address to connect the socket to
+   * @return A socket connected to the given address, or null if the socket fails to connect
+   */
+  protected Socket openSocket(SocketAddress addr) throws IOException {
+    Socket socket = null;
+    try {
+      socket = openSocketChannel();
       socket.setSoLinger(false, 0);
       socket.setTcpNoDelay(true);
       socket.connect(addr);
-      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-      return new TIOStreamTransport(input, output);
+      return socket;
     } catch (IOException e) {
       try {
         if (socket != null)
           socket.close();
-      } catch (IOException ioe) {}
+      } catch (IOException ioe) {
+        log.error("Failed to close socket after unsuccessful open.", e);
+      }
 
       throw e;
     }
   }
+
+  /**
+   * Opens a socket channel and returns the underlying socket.
+   */
+  protected Socket openSocketChannel() throws IOException {
+    return SelectorProvider.provider().openSocketChannel().socket();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
new file mode 100644
index 0000000..cedac9c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TTimeoutTransport}.
+ */
+public class TTimeoutTransportTest {
+
+  void expectedSocketSetup(Socket s) throws IOException {
+    s.setSoLinger(false, 0);
+    expectLastCall().once();
+    s.setTcpNoDelay(true);
+    expectLastCall().once();
+  }
+
+  @Test
+  public void testFailedSocketOpenIsClosed() throws IOException {
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.openSocket(addr);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+  @Test
+  public void testFailedInputStreamClosesSocket() throws IOException {
+    long timeout = 2 * 60 * 1000; // 2 mins
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+        .createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().once();
+
+    expect(timeoutTransport.wrapInputStream(s, timeout)).andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.createInternal(addr, timeout);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+  @Test
+  public void testFailedOutputStreamClosesSocket() throws IOException {
+    long timeout = 2 * 60 * 1000; // 2 mins
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    InputStream is = createMock(InputStream.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+        .addMockedMethod("wrapOutputStream").createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().once();
+
+    // Input stream is set up
+    expect(timeoutTransport.wrapInputStream(s, timeout)).andReturn(is);
+    // Output stream fails to be set up
+    expect(timeoutTransport.wrapOutputStream(s, timeout)).andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.createInternal(addr, timeout);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+}


[21/28] accumulo git commit: Merge remote-tracking branch 'origin/1.7' into 1.7

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.7' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d3e687e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d3e687e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d3e687e9

Branch: refs/heads/1.8
Commit: d3e687e93f1a4d01ca3038c52896ac5d9a2c4af1
Parents: 78a56a2 976c0aa
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:55 2016 -0400

----------------------------------------------------------------------
 .../master/balancer/BaseHostRegexTableLoadBalancerTest.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[19/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7

Conflicts:
	server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78a56a22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78a56a22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78a56a22

Branch: refs/heads/master
Commit: 78a56a22c92d1738961022646677b76b126e955c
Parents: f81a22e f66b9a0
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:18 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:18 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                     | 2 +-
 .../apache/accumulo/gc/GarbageCollectWriteAheadLogs.java    | 9 ++++-----
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 20484ce,9b67614..27bed62
--- a/pom.xml
+++ b/pom.xml
@@@ -159,14 -156,9 +159,14 @@@
          <version>1.32</version>
        </dependency>
        <dependency>
 +        <groupId>com.google.auto.service</groupId>
 +        <artifactId>auto-service</artifactId>
 +        <version>1.0-rc2</version>
 +      </dependency>
 +      <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
-         <version>2.2.2</version>
+         <version>2.2.4</version>
        </dependency>
        <!-- Hadoop-2.4.0 MiniDFSCluster uses classes dropped in Guava 15 -->
        <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/78a56a22/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a62ffb2,a22a34e..b57b8fc
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -28,63 -27,44 +27,63 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.gc.thrift.GCStatus;
  import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.AddressUtil;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.ServerConstants;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.instrument.Tracer;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.log4j.Logger;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
  import com.google.common.net.HostAndPort;
 +import com.google.protobuf.InvalidProtocolBufferException;
- import java.util.concurrent.TimeUnit;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
- import org.apache.accumulo.core.conf.Property;
  
  public class GarbageCollectWriteAheadLogs {
 -  private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
  
 -  private final Instance instance;
 +  private final AccumuloServerContext context;
    private final VolumeManager fs;
    private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
-   private AccumuloConfiguration config;
  
    private boolean useTrash;
  


[15/28] accumulo git commit: Merge remote-tracking branch 'origin/1.6' into 1.6

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.6' into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f66b9a0d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f66b9a0d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f66b9a0d

Branch: refs/heads/1.7
Commit: f66b9a0d18cde88bdc269421527ebc5a35445dda
Parents: 561f189 31a2224
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:28:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:28:49 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                       | 2 +-
 .../org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java  | 7 +++----
 2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[09/28] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/670f2ea8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/670f2ea8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/670f2ea8

Branch: refs/heads/master
Commit: 670f2ea8ade31746194417f4774b7f522023404f
Parents: 519e5b7 f81a22e
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 18:22:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:22:49 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[06/28] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f81a22ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f81a22ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f81a22ec

Branch: refs/heads/1.7
Commit: f81a22ecb05e079eeb558705fd95740c9253c79e
Parents: 7b92053 561f189
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 18:22:41 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:22:41 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 24 ++++++++++++++------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index 400d90a,0000000..cc3f51b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@@ -1,77 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.BufferedInputStream;
 +import java.io.BufferedOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.SocketAddress;
 +import java.nio.channels.spi.SelectorProvider;
 +
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.thrift.transport.TIOStreamTransport;
 +import org.apache.thrift.transport.TTransport;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class TTimeoutTransport {
 +
 +  private static volatile Method GET_INPUT_STREAM_METHOD = null;
 +
 +  private static Method getNetUtilsInputStreamMethod() {
 +    if (null == GET_INPUT_STREAM_METHOD) {
 +      synchronized (TTimeoutTransport.class) {
 +        if (null == GET_INPUT_STREAM_METHOD) {
 +          try {
 +            GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
 +          } catch (Exception e) {
 +            throw new RuntimeException(e);
 +          }
 +        }
 +      }
 +    }
 +
 +    return GET_INPUT_STREAM_METHOD;
 +  }
 +
 +  private static InputStream getInputStream(Socket socket, long timeout) {
 +    try {
 +      return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
 +    } catch (Exception e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
 +    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
 +  }
 +
 +  public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-     Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-     socket.setSoLinger(false, 0);
-     socket.setTcpNoDelay(true);
-     socket.connect(addr);
-     InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-     OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-     return new TIOStreamTransport(input, output);
++    Socket socket = null;
++    try {
++      socket = SelectorProvider.provider().openSocketChannel().socket();
++      socket.setSoLinger(false, 0);
++      socket.setTcpNoDelay(true);
++      socket.connect(addr);
++      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
++      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
++      return new TIOStreamTransport(input, output);
++    } catch (IOException e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
++      throw e;
++    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f81a22ec/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index f3cb9b5,0000000..be4238e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@@ -1,457 -1,0 +1,463 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.rpc;
 +
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.security.KeyStore;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Random;
 +
 +import javax.net.ssl.KeyManagerFactory;
 +import javax.net.ssl.SSLContext;
 +import javax.net.ssl.SSLSocket;
 +import javax.net.ssl.SSLSocketFactory;
 +import javax.net.ssl.TrustManagerFactory;
 +
 +import org.apache.accumulo.core.client.impl.ClientContext;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.TServiceClientFactory;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.transport.TFramedTransport;
 +import org.apache.thrift.transport.TSSLTransportFactory;
 +import org.apache.thrift.transport.TSaslClientTransport;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Factory methods for creating Thrift client objects
 + */
 +public class ThriftUtil {
 +  private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
 +
 +  private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
 +  private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
 +  private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 +
 +  public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5";
 +
 +  private static final Random SASL_BACKOFF_RAND = new Random();
 +  private static final int RELOGIN_MAX_BACKOFF = 5000;
 +
 +  /**
 +   * An instance of {@link TraceProtocolFactory}
 +   *
 +   * @return The default Thrift TProtocolFactory for RPC
 +   */
 +  public static TProtocolFactory protocolFactory() {
 +    return protocolFactory;
 +  }
 +
 +  /**
 +   * An instance of {@link org.apache.thrift.transport.TFramedTransport.Factory}
 +   *
 +   * @return The default Thrift TTransportFactory for RPC
 +   */
 +  public static TTransportFactory transportFactory() {
 +    return transportFactory;
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory and transport
 +   */
 +  public static <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
 +    return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address, and client context with no timeout.
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
 +      throws TTransportException {
 +    return getClient(factory, address, context, 0);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context. Client timeout is extracted from the
 +   * ClientContext
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Create a Thrift client using the given factory with a pooled transport (if available) using the address, client context and timeou
 +   *
 +   * @param factory
 +   *          Thrift client factory
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout)
 +      throws TTransportException {
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
 +    return createClient(factory, transport);
 +  }
 +
 +  /**
 +   * Return the transport used by the client to the shared pool.
 +   *
 +   * @param iface
 +   *          The Client being returned or null.
 +   */
 +  public static void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
 +    if (iface != null) {
 +      ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
 +    }
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context);
 +  }
 +
 +  /**
 +   * Create a TabletServer Thrift client
 +   *
 +   * @param address
 +   *          Server address for client to connect to
 +   * @param context
 +   *          Options for connecting to the server
 +   * @param timeout
 +   *          Socket timeout which overrides the ClientContext timeout
 +   */
 +  public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context, long timeout) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
 +  }
 +
 +  /**
 +   * Create a transport that is not pooled
 +   *
 +   * @param address
 +   *          Server address to open the transport to
 +   * @param context
 +   *          RPC options
 +   */
 +  public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
 +    return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams(), context.getSaslParams());
 +  }
 +
 +  /**
 +   * Get an instance of the TTransportFactory with the provided maximum frame size
 +   *
 +   * @param maxFrameSize
 +   *          Maximum Thrift message frame size
 +   * @return A, possibly cached, TTransportFactory with the requested maximum frame size
 +   */
 +  public static synchronized TTransportFactory transportFactory(int maxFrameSize) {
 +    TTransportFactory factory = factoryCache.get(maxFrameSize);
 +    if (factory == null) {
 +      factory = new TFramedTransport.Factory(maxFrameSize);
 +      factoryCache.put(maxFrameSize, factory);
 +    }
 +    return factory;
 +  }
 +
 +  /**
 +   * @see #transportFactory(int)
 +   */
 +  public static synchronized TTransportFactory transportFactory(long maxFrameSize) {
 +    if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
 +      throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
 +    return transportFactory((int) maxFrameSize);
 +  }
 +
 +  /**
 +   * Create a TTransport for clients to the given address with the provided socket timeout and session-layer configuration
 +   *
 +   * @param address
 +   *          Server address to connect to
 +   * @param timeout
 +   *          Client socket timeout
 +   * @param sslParams
 +   *          RPC options for SSL servers
 +   * @param saslParams
 +   *          RPC options for SASL servers
 +   * @return An open TTransport which must be closed when finished
 +   */
 +  public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams, SaslConnectionParams saslParams)
 +      throws TTransportException {
 +    boolean success = false;
 +    TTransport transport = null;
 +    try {
 +      if (sslParams != null) {
 +        // The check in AccumuloServerContext ensures that servers are brought up with sane configurations, but we also want to validate clients
 +        if (null != saslParams) {
 +          throw new IllegalStateException("Cannot use both SSL and SASL");
 +        }
 +
 +        log.trace("Creating SSL client transport");
 +
 +        // TSSLTransportFactory handles timeout 0 -> forever natively
 +        if (sslParams.useJsse()) {
 +          transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
 +        } else {
 +          // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
 +          // to do some magic to make sure that happens. Not an issue in JDK7
 +
 +          // Taken from thrift-0.9.1 to make the SSLContext
 +          SSLContext sslContext = createSSLContext(sslParams);
 +
 +          // Create the factory from it
 +          SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
 +
 +          // Wrap the real factory with our own that will set the protocol on the Socket before returning it
 +          ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory,
 +              new String[] {sslParams.getClientProtocol()});
 +
 +          // Create the TSocket from that
 +          transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
 +          // TSSLTransportFactory leaves transports open, so no need to open here
 +        }
 +
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      } else if (null != saslParams) {
 +        if (!UserGroupInformation.isSecurityEnabled()) {
 +          throw new IllegalStateException("Expected Kerberos security to be enabled if SASL is in use");
 +        }
 +
 +        log.trace("Creating SASL connection to {}:{}", address.getHostText(), address.getPort());
 +
 +        // Make sure a timeout is set
 +        try {
 +          transport = TTimeoutTransport.create(address, timeout);
 +        } catch (IOException e) {
 +          log.warn("Failed to open transport to {}", address);
 +          throw new TTransportException(e);
 +        }
 +
 +        try {
 +          // Log in via UGI, ensures we have logged in with our KRB credentials
 +          final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +
 +          // Is this pricey enough that we want to cache it?
 +          final String hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
 +
 +          final SaslMechanism mechanism = saslParams.getMechanism();
 +
 +          log.trace("Opening transport to server as {} to {}/{} using {}", currentUser, saslParams.getKerberosServerPrimary(), hostname, mechanism);
 +
 +          // Create the client SASL transport using the information for the server
 +          // Despite the 'protocol' argument seeming to be useless, it *must* be the primary of the server being connected to
 +          transport = new TSaslClientTransport(mechanism.getMechanismName(), null, saslParams.getKerberosServerPrimary(), hostname,
 +              saslParams.getSaslProperties(), saslParams.getCallbackHandler(), transport);
 +
 +          // Wrap it all in a processor which will run with a doAs the current user
 +          transport = new UGIAssumingTransport(transport, currentUser);
 +
 +          // Open the transport
 +          transport.open();
 +        } catch (TTransportException e) {
 +          log.warn("Failed to open SASL transport", e);
 +
 +          // We might have had a valid ticket, but it expired. We'll let the caller retry, but we will attempt to re-login to make the next attempt work.
 +          // Sadly, we have no way to determine the actual reason we got this TTransportException other than inspecting the exception msg.
 +          log.debug("Caught TTransportException opening SASL transport, checking if re-login is necessary before propagating the exception.");
 +          attemptClientReLogin();
 +
 +          throw e;
 +        } catch (IOException e) {
 +          log.warn("Failed to open SASL transport", e);
 +          throw new TTransportException(e);
 +        }
 +      } else {
 +        log.trace("Opening normal transport");
 +        if (timeout == 0) {
 +          transport = new TSocket(address.getHostText(), address.getPort());
 +          transport.open();
 +        } else {
 +          try {
 +            transport = TTimeoutTransport.create(address, timeout);
 +          } catch (IOException ex) {
 +            log.warn("Failed to open transport to " + address);
 +            throw new TTransportException(ex);
 +          }
 +
 +          // Open the transport
 +          transport.open();
 +        }
 +        transport = ThriftUtil.transportFactory().getTransport(transport);
 +      }
 +      success = true;
 +    } finally {
 +      if (!success && transport != null) {
 +        transport.close();
 +      }
 +    }
 +    return transport;
 +  }
 +
 +  /**
 +   * Some wonderful snippets of documentation from HBase on performing the re-login client-side (as well as server-side) in the following paragraph. We want to
 +   * attempt a re-login to automatically refresh the client's Krb "credentials" (remember, a server might also be a client, master sending RPC to tserver), but
 +   * we have to take care to avoid Kerberos' replay attack protection.
 +   * <p>
 +   * If multiple clients with the same principal try to connect to the same server at the same time, the server assumes a replay attack is in progress. This is
 +   * a feature of kerberos. In order to work around this, what is done is that the client backs off randomly and tries to initiate the connection again. The
 +   * other problem is to do with ticket expiry. To handle that, a relogin is attempted.
 +   */
 +  static void attemptClientReLogin() {
 +    try {
 +      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
 +      if (null == loginUser || !loginUser.hasKerberosCredentials()) {
 +        // We should have already checked that we're logged in and have credentials. A precondition-like check.
 +        throw new RuntimeException("Expected to find Kerberos UGI credentials, but did not");
 +      }
 +      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 +      // A Proxy user is the "effective user" (in name only), riding on top of the "real user"'s Krb credentials.
 +      UserGroupInformation realUser = currentUser.getRealUser();
 +
 +      // re-login only in case it is the login user or superuser.
 +      if (loginUser.equals(currentUser) || loginUser.equals(realUser)) {
 +        if (UserGroupInformation.isLoginKeytabBased()) {
 +          log.info("Performing keytab-based Kerberos re-login");
 +          loginUser.reloginFromKeytab();
 +        } else {
 +          log.info("Performing ticket-cache-based Kerberos re-login");
 +          loginUser.reloginFromTicketCache();
 +        }
 +
 +        // Avoid the replay attack protection, sleep 1 to 5000ms
 +        try {
 +          Thread.sleep((SASL_BACKOFF_RAND.nextInt(RELOGIN_MAX_BACKOFF) + 1));
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          return;
 +        }
 +      } else {
 +        log.debug("Not attempting Kerberos re-login: loginUser={}, currentUser={}, realUser={}", loginUser, currentUser, realUser);
 +      }
 +    } catch (IOException e) {
 +      // The inability to check is worrisome and deserves a RuntimeException instead of a propagated IO-like Exception.
 +      log.warn("Failed to check (and/or perform) Kerberos client re-login", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  /**
 +   * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
 +   * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
 +   *
 +   * @param params
 +   *          Parameters to use to create the SSLContext
 +   */
 +  private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException {
 +    SSLContext ctx;
 +    try {
 +      ctx = SSLContext.getInstance(params.getClientProtocol());
 +      TrustManagerFactory tmf = null;
 +      KeyManagerFactory kmf = null;
 +
 +      if (params.isTrustStoreSet()) {
 +        tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
 +        KeyStore ts = KeyStore.getInstance(params.getTrustStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getTrustStorePath())) {
 +          ts.load(fis, params.getTrustStorePass().toCharArray());
 +        }
 +        tmf.init(ts);
 +      }
 +
 +      if (params.isKeyStoreSet()) {
 +        kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
 +        KeyStore ks = KeyStore.getInstance(params.getKeyStoreType());
 +        try (FileInputStream fis = new FileInputStream(params.getKeyStorePath())) {
 +          ks.load(fis, params.getKeyStorePass().toCharArray());
 +        }
 +        kmf.init(ks, params.getKeyStorePass().toCharArray());
 +      }
 +
 +      if (params.isKeyStoreSet() && params.isTrustStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 +      } else if (params.isKeyStoreSet()) {
 +        ctx.init(kmf.getKeyManagers(), null, null);
 +      } else {
 +        ctx.init(null, tmf.getTrustManagers(), null);
 +      }
 +
 +    } catch (Exception e) {
 +      throw new TTransportException("Error creating the transport", e);
 +    }
 +    return ctx;
 +  }
 +
 +  /**
 +   * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout.
 +   *
 +   * @param factory
 +   *          Factory to create the socket from
 +   * @param host
 +   *          Destination host
 +   * @param port
 +   *          Destination port
 +   * @param timeout
 +   *          Socket timeout
 +   */
 +  private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
++    SSLSocket socket = null;
 +    try {
-       SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
++      socket = (SSLSocket) factory.createSocket(host, port);
 +      socket.setSoTimeout(timeout);
 +      return new TSocket(socket);
 +    } catch (Exception e) {
++      try {
++        if (socket != null)
++          socket.close();
++      } catch (IOException ioe) {}
++
 +      throw new TTransportException("Could not connect to " + host + " on port " + port, e);
 +    }
 +  }
 +}


[03/28] accumulo git commit: ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Posted by el...@apache.org.
ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/561f1893
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/561f1893
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/561f1893

Branch: refs/heads/master
Commit: 561f1893b9f239457e16c046702d8ba9df1474e2
Parents: d5ea00f
Author: Michiel Vanderlee <ma...@gmail.com>
Authored: Mon Jun 20 18:09:37 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:09:37 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/util/TTimeoutTransport.java   | 24 ++++++++++++++------
 .../apache/accumulo/core/util/ThriftUtil.java   |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
index 1eac8be..52a9a98 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
@@ -66,12 +66,22 @@ public class TTimeoutTransport {
   }
 
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-    Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-    socket.setSoLinger(false, 0);
-    socket.setTcpNoDelay(true);
-    socket.connect(addr);
-    InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-    OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-    return new TIOStreamTransport(input, output);
+    Socket socket = null;
+    try {
+      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket.setSoLinger(false, 0);
+      socket.setTcpNoDelay(true);
+      socket.connect(addr);
+      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
+      throw e;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 29f7fb7..48ff3f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -349,11 +349,17 @@ public class ThriftUtil {
    *          Socket timeout
    */
   private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    SSLSocket socket = null;
     try {
-      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket = (SSLSocket) factory.createSocket(host, port);
       socket.setSoTimeout(timeout);
       return new TSocket(socket);
     } catch (Exception e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
       throw new TTransportException("Could not connect to " + host + " on port " + port, e);
     }
   }


[17/28] accumulo git commit: Merge remote-tracking branch 'origin/1.6' into 1.6

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.6' into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f66b9a0d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f66b9a0d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f66b9a0d

Branch: refs/heads/1.6
Commit: f66b9a0d18cde88bdc269421527ebc5a35445dda
Parents: 561f189 31a2224
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:28:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:28:49 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                       | 2 +-
 .../org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java  | 7 +++----
 2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[28/28] accumulo git commit: Merge branch '1.8'

Posted by el...@apache.org.
Merge branch '1.8'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c2a3b685
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c2a3b685
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c2a3b685

Branch: refs/heads/master
Commit: c2a3b685cf32be5fe2c4e33b691085d277473598
Parents: 7bd9c08 f45a65c
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:33:21 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:33:21 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[26/28] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f45a65cb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f45a65cb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f45a65cb

Branch: refs/heads/1.8
Commit: f45a65cb241e610351ad5e36302bd59ad87755e7
Parents: af80261 d3e687e
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:33:14 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:33:14 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[04/28] accumulo git commit: ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Posted by el...@apache.org.
ACCUMULO-4317 Ensure the socket is closed on failure to set up a Thrift client

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/561f1893
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/561f1893
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/561f1893

Branch: refs/heads/1.8
Commit: 561f1893b9f239457e16c046702d8ba9df1474e2
Parents: d5ea00f
Author: Michiel Vanderlee <ma...@gmail.com>
Authored: Mon Jun 20 18:09:37 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 18:09:37 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/util/TTimeoutTransport.java   | 24 ++++++++++++++------
 .../apache/accumulo/core/util/ThriftUtil.java   |  8 ++++++-
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
index 1eac8be..52a9a98 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
@@ -66,12 +66,22 @@ public class TTimeoutTransport {
   }
 
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-    Socket socket = SelectorProvider.provider().openSocketChannel().socket();
-    socket.setSoLinger(false, 0);
-    socket.setTcpNoDelay(true);
-    socket.connect(addr);
-    InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-    OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-    return new TIOStreamTransport(input, output);
+    Socket socket = null;
+    try {
+      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket.setSoLinger(false, 0);
+      socket.setTcpNoDelay(true);
+      socket.connect(addr);
+      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
+      throw e;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/561f1893/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 29f7fb7..48ff3f2 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@ -349,11 +349,17 @@ public class ThriftUtil {
    *          Socket timeout
    */
   private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    SSLSocket socket = null;
     try {
-      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket = (SSLSocket) factory.createSocket(host, port);
       socket.setSoTimeout(timeout);
       return new TSocket(socket);
     } catch (Exception e) {
+      try {
+        if (socket != null)
+          socket.close();
+      } catch (IOException ioe) {}
+
       throw new TTransportException("Could not connect to " + host + " on port " + port, e);
     }
   }


[23/28] accumulo git commit: Merge remote-tracking branch 'origin/1.7' into 1.7

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.7' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d3e687e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d3e687e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d3e687e9

Branch: refs/heads/1.7
Commit: d3e687e93f1a4d01ca3038c52896ac5d9a2c4af1
Parents: 78a56a2 976c0aa
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:55 2016 -0400

----------------------------------------------------------------------
 .../master/balancer/BaseHostRegexTableLoadBalancerTest.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[22/28] accumulo git commit: Merge remote-tracking branch 'origin/1.7' into 1.7

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.7' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d3e687e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d3e687e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d3e687e9

Branch: refs/heads/master
Commit: d3e687e93f1a4d01ca3038c52896ac5d9a2c4af1
Parents: 78a56a2 976c0aa
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:32:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:32:55 2016 -0400

----------------------------------------------------------------------
 .../master/balancer/BaseHostRegexTableLoadBalancerTest.java   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[16/28] accumulo git commit: Merge remote-tracking branch 'origin/1.6' into 1.6

Posted by el...@apache.org.
Merge remote-tracking branch 'origin/1.6' into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f66b9a0d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f66b9a0d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f66b9a0d

Branch: refs/heads/master
Commit: f66b9a0d18cde88bdc269421527ebc5a35445dda
Parents: 561f189 31a2224
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 23:28:49 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 23:28:49 2016 -0400

----------------------------------------------------------------------
 pom.xml                                                       | 2 +-
 .../org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java  | 7 +++----
 2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------