You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/03/06 17:03:35 UTC

[accumulo] branch master updated: Remove some reflection where not needed (#1550)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new fb68612  Remove some reflection where not needed (#1550)
fb68612 is described below

commit fb6861200bec7d45f44419a89e64a2fbe913104b
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Mar 6 12:03:24 2020 -0500

    Remove some reflection where not needed (#1550)
    
    Remove unnecessary reflection to load methods that are contained in
    public stable Hadoop APIs.
    
    This reflection was added for simultaneous Hadoop 1 and Hadoop 2
    support. We now require Hadoop 3 and these APIs are stable, so
    reflection is not needed.
    
    Also reduce visibility of some methods in TTimeoutTransport exposed for
    testing and delete unnecessary code.
---
 .../accumulo/core/rpc/TTimeoutTransport.java       | 77 ++--------------------
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 23 +++----
 .../main/java/org/apache/accumulo/start/Main.java  |  3 +-
 3 files changed, 17 insertions(+), 86 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index c74a5a3..10bff34 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
@@ -50,56 +49,8 @@ public class TTimeoutTransport {
 
   private static final TTimeoutTransport INSTANCE = new TTimeoutTransport();
 
-  private volatile Method GET_INPUT_STREAM_METHOD = null;
-
   private TTimeoutTransport() {}
 
-  private Method getNetUtilsInputStreamMethod() {
-    if (GET_INPUT_STREAM_METHOD == null) {
-      synchronized (this) {
-        if (GET_INPUT_STREAM_METHOD == null) {
-          try {
-            GET_INPUT_STREAM_METHOD =
-                NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      }
-    }
-
-    return GET_INPUT_STREAM_METHOD;
-  }
-
-  /**
-   * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle
-   * compatibility with both Hadoop 1 and 2.
-   *
-   * @param socket
-   *          The socket to create the input stream on
-   * @param timeout
-   *          The timeout for the input stream in milliseconds
-   * @return An InputStream on the socket
-   */
-  private InputStream getInputStream(Socket socket, long timeout) throws IOException {
-    try {
-      return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
-    } catch (Exception e) {
-      Throwable cause = e.getCause();
-      // Try to re-throw the IOException directly
-      if (cause instanceof IOException) {
-        throw (IOException) cause;
-      }
-
-      if (e instanceof RuntimeException) {
-        // Don't re-wrap another RTE around an RTE
-        throw (RuntimeException) e;
-      } else {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
   /**
    * Creates a Thrift TTransport to the given address with the given timeout. All created resources
    * are closed if an exception is thrown.
@@ -118,22 +69,6 @@ public class TTimeoutTransport {
   }
 
   /**
-   * Creates a Thrift TTransport to the given address with the given timeout. All created resources
-   * are closed if an exception is thrown.
-   *
-   * @param addr
-   *          The address to connect the client to
-   * @param timeoutMillis
-   *          The timeout in milliseconds for the connection
-   * @return A TTransport connected to the given <code>addr</code>
-   * @throws IOException
-   *           If the transport fails to be created/connected
-   */
-  public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
-    return INSTANCE.createInternal(addr, timeoutMillis);
-  }
-
-  /**
    * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift
    * transport using the socket.
    *
@@ -145,7 +80,7 @@ public class TTimeoutTransport {
    * @throws IOException
    *           If the Thrift client is failed to be connected/created
    */
-  protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
+  TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
     Socket socket = null;
     try {
       socket = openSocket(addr);
@@ -174,12 +109,12 @@ public class TTimeoutTransport {
   }
 
   // Visible for testing
-  protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
-    return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+  InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
+    return new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10);
   }
 
   // Visible for testing
-  protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
+  OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
     return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
   }
 
@@ -190,7 +125,7 @@ public class TTimeoutTransport {
    *          The address to connect the socket to
    * @return A socket connected to the given address, or null if the socket fails to connect
    */
-  protected Socket openSocket(SocketAddress addr) throws IOException {
+  Socket openSocket(SocketAddress addr) throws IOException {
     Socket socket = null;
     try {
       socket = openSocketChannel();
@@ -213,7 +148,7 @@ public class TTimeoutTransport {
   /**
    * Opens a socket channel and returns the underlying socket.
    */
-  protected Socket openSocketChannel() throws IOException {
+  Socket openSocketChannel() throws IOException {
     return SelectorProvider.provider().openSocketChannel().socket();
   }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 1892b59..4b0bc82 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -32,12 +32,12 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.lang.reflect.Method;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -175,7 +175,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
         }
         workQueue.drainTo(work);
 
-        Method durabilityMethod = null;
+        Optional<Boolean> shouldHSync = Optional.empty();
         loop: for (LogWork logWork : work) {
           switch (logWork.durability) {
             case DEFAULT:
@@ -184,11 +184,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
               // shouldn't make it to the work queue
               throw new IllegalArgumentException("unexpected durability " + logWork.durability);
             case SYNC:
-              durabilityMethod = sync;
+              shouldHSync = Optional.of(Boolean.TRUE);
               break loop;
             case FLUSH:
-              if (durabilityMethod == null) {
-                durabilityMethod = flush;
+              if (shouldHSync.isEmpty()) {
+                shouldHSync = Optional.of(Boolean.FALSE);
               }
               break;
           }
@@ -196,15 +196,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
 
         long start = System.currentTimeMillis();
         try {
-          if (durabilityMethod != null) {
-            durabilityMethod.invoke(logFile);
-            if (durabilityMethod == sync) {
+          if (shouldHSync.isPresent()) {
+            if (shouldHSync.get()) {
+              logFile.hsync();
               syncCounter.incrementAndGet();
             } else {
+              logFile.hflush();
               flushCounter.incrementAndGet();
             }
           }
-        } catch (Exception ex) {
+        } catch (IOException | RuntimeException ex) {
           fail(work, ex, "synching");
         }
         long duration = System.currentTimeMillis() - start;
@@ -322,8 +323,6 @@ public class DfsLogger implements Comparable<DfsLogger> {
   private final ServerResources conf;
   private FSDataOutputStream logFile;
   private DataOutputStream encryptingLogFile = null;
-  private Method sync;
-  private Method flush;
   private String logPath;
   private Daemon syncThread;
 
@@ -442,8 +441,6 @@ public class DfsLogger implements Comparable<DfsLogger> {
         logFile = fs.createSyncable(logfilePath, 0, replication, blockSize);
       else
         logFile = fs.create(logfilePath, true, 0, replication, blockSize);
-      sync = logFile.getClass().getMethod("hsync");
-      flush = logFile.getClass().getMethod("hflush");
 
       // check again that logfile can be sync'd
       if (!fs.canSyncAndFlush(logfilePath)) {
diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java b/start/src/main/java/org/apache/accumulo/start/Main.java
index 76d3def..e147106 100644
--- a/start/src/main/java/org/apache/accumulo/start/Main.java
+++ b/start/src/main/java/org/apache/accumulo/start/Main.java
@@ -45,8 +45,7 @@ public class Main {
   public static void main(final String[] args) {
     try {
       // Preload classes that cause a deadlock between the ServiceLoader and the DFSClient when
-      // using
-      // the VFSClassLoader with jars in HDFS.
+      // using the VFSClassLoader with jars in HDFS.
       ClassLoader loader = getClassLoader();
       Class<?> confClass = null;
       try {