You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/06/21 03:40:03 UTC

[13/28] accumulo git commit: ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it

ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7bd9c088
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7bd9c088
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7bd9c088

Branch: refs/heads/master
Commit: 7bd9c0882efec7d42b83870b3cc3436471359472
Parents: bfc2a5b
Author: Josh Elser <el...@apache.org>
Authored: Mon Jun 20 22:44:55 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jun 20 22:44:55 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/rpc/TTimeoutTransport.java    | 146 ++++++++++++++++--
 .../core/rpc/TTimeoutTransportTest.java         | 151 +++++++++++++++++++
 2 files changed, 286 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index cc3f51b..809975f 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -30,16 +30,31 @@ import java.nio.channels.spi.SelectorProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
+/**
+ * A utility class for setting up a {@link TTransport} with various necessary configurations for ideal performance in Accumulo. These configurations include:
+ * <ul>
+ * <li>Setting SO_LINGER=false on the socket.</li>
+ * <li>Setting TCP_NO_DELAY=true on the socket.</li>
+ * <li>Setting timeouts on the I/OStreams.</li>
+ * </ul>
+ */
 public class TTimeoutTransport {
+  private static final Logger log = LoggerFactory.getLogger(TTimeoutTransport.class);
+
+  private static final TTimeoutTransport INSTANCE = new TTimeoutTransport();
+
+  private volatile Method GET_INPUT_STREAM_METHOD = null;
 
-  private static volatile Method GET_INPUT_STREAM_METHOD = null;
+  private TTimeoutTransport() {}
 
-  private static Method getNetUtilsInputStreamMethod() {
+  private Method getNetUtilsInputStreamMethod() {
     if (null == GET_INPUT_STREAM_METHOD) {
-      synchronized (TTimeoutTransport.class) {
+      synchronized (this) {
         if (null == GET_INPUT_STREAM_METHOD) {
           try {
             GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
@@ -53,35 +68,144 @@ public class TTimeoutTransport {
     return GET_INPUT_STREAM_METHOD;
   }
 
-  private static InputStream getInputStream(Socket socket, long timeout) {
+  /**
+   * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle compatibility with both Hadoop 1 and 2.
+   *
+   * @param socket
+   *          The socket to create the input stream on
+   * @param timeout
+   *          The timeout for the input stream in milliseconds
+   * @return An InputStream on the socket
+   */
+  private InputStream getInputStream(Socket socket, long timeout) throws IOException {
     try {
       return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      Throwable cause = e.getCause();
+      // Try to re-throw the IOException directly
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+
+      if (e instanceof RuntimeException) {
+        // Don't re-wrap another RTE around an RTE
+        throw (RuntimeException) e;
+      } else {
+        throw new RuntimeException(e);
+      }
     }
   }
 
+  /**
+   * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+   *
+   * @param addr
+   *          The address to connect the client to
+   * @param timeoutMillis
+   *          The timeout in milliseconds for the connection
+   * @return A TTransport connected to the given <code>addr</code>
+   * @throws IOException
+   *           If the transport fails to be created/connected
+   */
   public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
-    return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
+    return INSTANCE.createInternal(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
   }
 
+  /**
+   * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown.
+   *
+   * @param addr
+   *          The address to connect the client to
+   * @param timeoutMillis
+   *          The timeout in milliseconds for the connection
+   * @return A TTransport connected to the given <code>addr</code>
+   * @throws IOException
+   *           If the transport fails to be created/connected
+   */
   public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
+    return INSTANCE.createInternal(addr, timeoutMillis);
+  }
+
+  /**
+   * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift transport using the socket.
+   *
+   * @param addr
+   *          The address the socket should connect
+   * @param timeoutMillis
+   *          The socket timeout in milliseconds
+   * @return A TTransport instance to the given <code>addr</code>
+   * @throws IOException
+   *           If the Thrift client is failed to be connected/created
+   */
+  protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
     Socket socket = null;
     try {
-      socket = SelectorProvider.provider().openSocketChannel().socket();
+      socket = openSocket(addr);
+    } catch (IOException e) {
+      // openSocket handles closing the Socket on error
+      throw e;
+    }
+
+    // Should be non-null
+    assert null != socket;
+
+    // Set up the streams
+    try {
+      InputStream input = wrapInputStream(socket, timeoutMillis);
+      OutputStream output = wrapOutputStream(socket, timeoutMillis);
+      return new TIOStreamTransport(input, output);
+    } catch (IOException e) {
+      try {
+        socket.close();
+      } catch (IOException ioe) {
+        log.error("Failed to close socket after unsuccessful I/O stream setup", e);
+      }
+
+      throw e;
+    }
+  }
+
+  // Visible for testing
+  protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
+    return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+  }
+
+  // Visible for testing
+  protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
+    return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+  }
+
+  /**
+   * Opens and configures a {@link Socket} for Accumulo RPC.
+   *
+   * @param addr
+   *          The address to connect the socket to
+   * @return A socket connected to the given address, or null if the socket fails to connect
+   */
+  protected Socket openSocket(SocketAddress addr) throws IOException {
+    Socket socket = null;
+    try {
+      socket = openSocketChannel();
       socket.setSoLinger(false, 0);
       socket.setTcpNoDelay(true);
       socket.connect(addr);
-      InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
-      OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
-      return new TIOStreamTransport(input, output);
+      return socket;
     } catch (IOException e) {
       try {
         if (socket != null)
           socket.close();
-      } catch (IOException ioe) {}
+      } catch (IOException ioe) {
+        log.error("Failed to close socket after unsuccessful open.", e);
+      }
 
       throw e;
     }
   }
+
+  /**
+   * Opens a socket channel and returns the underlying socket.
+   */
+  protected Socket openSocketChannel() throws IOException {
+    return SelectorProvider.provider().openSocketChannel().socket();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
new file mode 100644
index 0000000..cedac9c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link TTimeoutTransport}.
+ */
+public class TTimeoutTransportTest {
+
+  void expectedSocketSetup(Socket s) throws IOException {
+    s.setSoLinger(false, 0);
+    expectLastCall().once();
+    s.setTcpNoDelay(true);
+    expectLastCall().once();
+  }
+
+  @Test
+  public void testFailedSocketOpenIsClosed() throws IOException {
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.openSocket(addr);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+  @Test
+  public void testFailedInputStreamClosesSocket() throws IOException {
+    long timeout = 2 * 60 * 1000; // 2 mins
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+        .createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().once();
+
+    expect(timeoutTransport.wrapInputStream(s, timeout)).andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.createInternal(addr, timeout);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+  @Test
+  public void testFailedOutputStreamClosesSocket() throws IOException {
+    long timeout = 2 * 60 * 1000; // 2 mins
+    SocketAddress addr = createMock(SocketAddress.class);
+    Socket s = createMock(Socket.class);
+    InputStream is = createMock(InputStream.class);
+    TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream")
+        .addMockedMethod("wrapOutputStream").createMock();
+
+    // Return out mocked socket
+    expect(timeoutTransport.openSocketChannel()).andReturn(s).once();
+
+    // tcpnodelay and solinger
+    expectedSocketSetup(s);
+
+    // Connect to the addr
+    s.connect(addr);
+    expectLastCall().once();
+
+    // Input stream is set up
+    expect(timeoutTransport.wrapInputStream(s, timeout)).andReturn(is);
+    // Output stream fails to be set up
+    expect(timeoutTransport.wrapOutputStream(s, timeout)).andThrow(new IOException());
+
+    // The socket should be closed after the above IOException
+    s.close();
+
+    replay(addr, s, timeoutTransport);
+
+    try {
+      timeoutTransport.createInternal(addr, timeout);
+      fail("Expected to catch IOException but got none");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    verify(addr, s, timeoutTransport);
+  }
+
+}