You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/06/21 03:40:03 UTC
[13/28] accumulo git commit: ACCUMULO-4317 Refactor TTimeoutTransport
to be able to better test it
ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7bd9c088
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7bd9c088
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7bd9c088
Branch: refs/heads/master
Commit: 7bd9c0882efec7d42b83870b3cc3436471359472
Parents: bfc2a5b
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:44:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:44:55 2016 -0400
----------------------------------------------------------------------
.../accumulo/core/rpc/TTimeoutTransport.java | 146 ++++++++++++++++--
.../core/rpc/TTimeoutTransportTest.java | 151 +++++++++++++++++++
2 files changed, 286 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index cc3f51b..809975f 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -30,16 +30,31 @@ import java.nio.channels.spi.SelectorProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.net.HostAndPort;
+/**
+ * A utility class for setting up a {@link TTransport} with various necessary configurations for ideal performance in Accumulo. These configurations include:
+ * <ul>
+ * <li>Setting SO_LINGER=false on the socket.</li>
+ * <li>Setting TCP_NO_DELAY=true on the socket.</li>
+ * <li>Setting timeouts on the I/OStreams.</li>
+ * </ul>
+ */
public class TTimeoutTransport {
+ private static final Logger log = LoggerFactory.getLogger(TTimeoutTransport.class);
+
+ private static final TTimeoutTransport INSTANCE = new TTimeoutTransport();
+
+ private volatile Method GET_INPUT_STREAM_METHOD = null;
- private static volatile Method GET_INPUT_STREAM_METHOD = null;
+ private TTimeoutTransport() {}
- private static Method getNetUtilsInputStreamMethod() {
+ private Method getNetUtilsInputStreamMethod() {
if (null == GET_INPUT_STREAM_METHOD) {
- synchronized (TTimeoutTransport.class) {
+ synchronized (this) {
if (null == GET_INPUT_STREAM_METHOD) {
try {
GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
@@ -53,35 +68,144 @@ public class TTimeoutTransport {
return GET_INPUT_STREAM_METHOD;
}
- private static InputStream getInputStream(Socket socket, long timeout) {
+ /**
+ * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle compatibility with both Hadoop 1 and 2.
+ *
+ * @param socket
+ * The socket to create the input stream on
+ * @param timeout
+ * The timeout for the input stream in milliseconds
+ * @return An InputStream on the socket
+ */
+ private InputStream getInputStream(Socket socket, long timeout) throws IOException {
try {
return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
} catch (Exception e) {
- throw new RuntimeException(e);
+ Throwable cause = e.getCause();
+ // Try to re-throw the IOException directly
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+
+ if (e instanceof RuntimeException) {
+ // Don't re-wrap another RTE around an RTE
+ throw (RuntimeException) e;
+ } else {
+ throw new RuntimeException(e);
+ }
}
}
+ /**
+ * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+ *
+ * @param addr
+ * The address to connect the client to
+ * @param timeoutMillis
+ * The timeout in milliseconds for the connection
+ * @return A TTransport connected to the given <code>addr</code>
+ * @throws IOException
+ * If the transport fails to be created/connected
+ */
public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
- return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
+ return INSTANCE.createInternal(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
}
+ /**
+ * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+ *
+ * @param addr
+ * The address to connect the client to
+ * @param timeoutMillis
+ * The timeout in milliseconds for the connection
+ * @return A TTransport connected to the given <code>addr</code>
+ * @throws IOException
+ * If the transport fails to be created/connected
+ */
public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
+ return INSTANCE.createInternal(addr, timeoutMillis);
+ }
+
+ /**
+ * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift transport using the socket.
+ *
+ * @param addr
+ * The address the socket should connect
+ * @param timeoutMillis
+ * The socket timeout in milliseconds
+ * @return A TTransport instance to the given <code>addr</code>
+ * @throws IOException
+ * If the Thrift client is failed to be connected/created
+ */
+ protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
Socket socket = null;
try {
- socket = SelectorProvider.provider().openSocketChannel().socket();
+ socket = openSocket(addr);
+ } catch (IOException e) {
+ // openSocket handles closing the Socket on error
+ throw e;
+ }
+
+ // Should be non-null
+ assert null != socket;
+
+ // Set up the streams
+ try {
+ InputStream input = wrapInputStream(socket, timeoutMillis);
+ OutputStream output = wrapOutputStream(socket, timeoutMillis);
+ return new TIOStreamTransport(input, output);
+ } catch (IOException e) {
+ try {
+ socket.close();
+ } catch (IOException ioe) {
+ log.error("Failed to close socket after unsuccessful I/O stream setup", e);
+ }
+
+ throw e;
+ }
+ }
+
+ // Visible for testing
+ protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
+ return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+ }
+
+ // Visible for testing
+ protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
+ return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+ }
+
+ /**
+ * Opens and configures a {@link Socket} for Accumulo RPC.
+ *
+ * @param addr
+ * The address to connect the socket to
+ * @return A socket connected to the given address, or null if the socket fails to connect
+ */
+ protected Socket openSocket(SocketAddress addr) throws IOException {
+ Socket socket = null;
+ try {
+ socket = openSocketChannel();
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.connect(addr);
- InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
- OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
- return new TIOStreamTransport(input, output);
+ return socket;
} catch (IOException e) {
try {
if (socket != null)
socket.close();
- } catch (IOException ioe) {}
+ } catch (IOException ioe) {
+ log.error("Failed to close socket after unsuccessful open.", e);
+ }
throw e;
}
}
+
+ /**
+ * Opens a socket channel and returns the underlying socket.
+ */
+ protected Socket openSocketChannel() throws IOException {
+ return SelectorProvider.provider().openSocketChannel().socket();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
new file mode 100644
index 0000000..cedac9c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TTimeoutTransport}.
+ */
+public class TTimeoutTransportTest {
+
+ void expectedSocketSetup(Socket s) throws IOException {
+ s.setSoLinger(false, 0);
+ expectLastCall().once();
+ s.setTcpNoDelay(true);
+ expectLastCall().once();
+ }
+
+ @Test
+ public void testFailedSocketOpenIsClosed() throws IOException {
+ SocketAddress addr = createMock(SocketAddress.class);
+ Socket s = createMock(Socket.class);
+ TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").createMock();
+
+ // Return out mocked socket
+ expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+ // tcpnodelay and solinger
+ expectedSocketSetup(s);
+
+ // Connect to the addr
+ s.connect(addr);
+ expectLastCall().andThrow(new IOException());
+
+ // The socket should be closed after the above IOException
+ s.close();
+
+ replay(addr, s, timeoutTransport);
+
+ try {
+ timeoutTransport.openSocket(addr);
+ fail("Expected to catch IOException but got none");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ verify(addr, s, timeoutTransport);
+ }
+
+ @Test
+ public void testFailedInputStreamClosesSocket() throws IOException {
+ long timeout = 2 * 60 * 1000; // 2 mins
+ SocketAddress addr = createMock(SocketAddress.class);
+ Socket s = createMock(Socket.class);
+ TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+ .createMock();
+
+ // Return out mocked socket
+ expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+ // tcpnodelay and solinger
+ expectedSocketSetup(s);
+
+ // Connect to the addr
+ s.connect(addr);
+ expectLastCall().once();
+
+ expect(timeoutTransport.wrapInputStream(s, timeout)).andThrow(new IOException());
+
+ // The socket should be closed after the above IOException
+ s.close();
+
+ replay(addr, s, timeoutTransport);
+
+ try {
+ timeoutTransport.createInternal(addr, timeout);
+ fail("Expected to catch IOException but got none");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ verify(addr, s, timeoutTransport);
+ }
+
+ @Test
+ public void testFailedOutputStreamClosesSocket() throws IOException {
+ long timeout = 2 * 60 * 1000; // 2 mins
+ SocketAddress addr = createMock(SocketAddress.class);
+ Socket s = createMock(Socket.class);
+ InputStream is = createMock(InputStream.class);
+ TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+ .addMockedMethod("wrapOutputStream").createMock();
+
+ // Return out mocked socket
+ expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+ // tcpnodelay and solinger
+ expectedSocketSetup(s);
+
+ // Connect to the addr
+ s.connect(addr);
+ expectLastCall().once();
+
+ // Input stream is set up
+ expect(timeoutTransport.wrapInputStream(s, timeout)).andReturn(is);
+ // Output stream fails to be set up
+ expect(timeoutTransport.wrapOutputStream(s, timeout)).andThrow(new IOException());
+
+ // The socket should be closed after the above IOException
+ s.close();
+
+ replay(addr, s, timeoutTransport);
+
+ try {
+ timeoutTransport.createInternal(addr, timeout);
+ fail("Expected to catch IOException but got none");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ verify(addr, s, timeoutTransport);
+ }
+
+}