You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/03/06 17:03:35 UTC
[accumulo] branch master updated: Remove some reflection where not
needed (#1550)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new fb68612 Remove some reflection where not needed (#1550)
fb68612 is described below
commit fb6861200bec7d45f44419a89e64a2fbe913104b
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Mar 6 12:03:24 2020 -0500
Remove some reflection where not needed (#1550)
Remove unnecessary reflection to load methods that are contained in
public stable Hadoop APIs.
This reflection was added for simultaneous Hadoop 1 and Hadoop 2
support. We now require Hadoop 3 and these APIs are stable, so
reflection is not needed.
Also reduce visibility of some methods in TTimeoutTransport exposed for
testing and delete unnecessary code.
---
.../accumulo/core/rpc/TTimeoutTransport.java | 77 ++--------------------
.../org/apache/accumulo/tserver/log/DfsLogger.java | 23 +++----
.../main/java/org/apache/accumulo/start/Main.java | 3 +-
3 files changed, 17 insertions(+), 86 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index c74a5a3..10bff34 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
@@ -50,56 +49,8 @@ public class TTimeoutTransport {
private static final TTimeoutTransport INSTANCE = new TTimeoutTransport();
- private volatile Method GET_INPUT_STREAM_METHOD = null;
-
private TTimeoutTransport() {}
- private Method getNetUtilsInputStreamMethod() {
- if (GET_INPUT_STREAM_METHOD == null) {
- synchronized (this) {
- if (GET_INPUT_STREAM_METHOD == null) {
- try {
- GET_INPUT_STREAM_METHOD =
- NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- return GET_INPUT_STREAM_METHOD;
- }
-
- /**
- * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle
- * compatibility with both Hadoop 1 and 2.
- *
- * @param socket
- * The socket to create the input stream on
- * @param timeout
- * The timeout for the input stream in milliseconds
- * @return An InputStream on the socket
- */
- private InputStream getInputStream(Socket socket, long timeout) throws IOException {
- try {
- return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout);
- } catch (Exception e) {
- Throwable cause = e.getCause();
- // Try to re-throw the IOException directly
- if (cause instanceof IOException) {
- throw (IOException) cause;
- }
-
- if (e instanceof RuntimeException) {
- // Don't re-wrap another RTE around an RTE
- throw (RuntimeException) e;
- } else {
- throw new RuntimeException(e);
- }
- }
- }
-
/**
* Creates a Thrift TTransport to the given address with the given timeout. All created resources
* are closed if an exception is thrown.
@@ -118,22 +69,6 @@ public class TTimeoutTransport {
}
/**
- * Creates a Thrift TTransport to the given address with the given timeout. All created resources
- * are closed if an exception is thrown.
- *
- * @param addr
- * The address to connect the client to
- * @param timeoutMillis
- * The timeout in milliseconds for the connection
- * @return A TTransport connected to the given <code>addr</code>
- * @throws IOException
- * If the transport fails to be created/connected
- */
- public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
- return INSTANCE.createInternal(addr, timeoutMillis);
- }
-
- /**
* Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift
* transport using the socket.
*
@@ -145,7 +80,7 @@ public class TTimeoutTransport {
* @throws IOException
* If the Thrift client is failed to be connected/created
*/
- protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
+ TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException {
Socket socket = null;
try {
socket = openSocket(addr);
@@ -174,12 +109,12 @@ public class TTimeoutTransport {
}
// Visible for testing
- protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
- return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+ InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException {
+ return new BufferedInputStream(NetUtils.getInputStream(socket, timeoutMillis), 1024 * 10);
}
// Visible for testing
- protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
+ OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException {
return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
}
@@ -190,7 +125,7 @@ public class TTimeoutTransport {
* The address to connect the socket to
* @return A socket connected to the given address, or null if the socket fails to connect
*/
- protected Socket openSocket(SocketAddress addr) throws IOException {
+ Socket openSocket(SocketAddress addr) throws IOException {
Socket socket = null;
try {
socket = openSocketChannel();
@@ -213,7 +148,7 @@ public class TTimeoutTransport {
/**
* Opens a socket channel and returns the underlying socket.
*/
- protected Socket openSocketChannel() throws IOException {
+ Socket openSocketChannel() throws IOException {
return SelectorProvider.provider().openSocketChannel().socket();
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 1892b59..4b0bc82 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -32,12 +32,12 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.reflect.Method;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -175,7 +175,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
}
workQueue.drainTo(work);
- Method durabilityMethod = null;
+ Optional<Boolean> shouldHSync = Optional.empty();
loop: for (LogWork logWork : work) {
switch (logWork.durability) {
case DEFAULT:
@@ -184,11 +184,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
// shouldn't make it to the work queue
throw new IllegalArgumentException("unexpected durability " + logWork.durability);
case SYNC:
- durabilityMethod = sync;
+ shouldHSync = Optional.of(Boolean.TRUE);
break loop;
case FLUSH:
- if (durabilityMethod == null) {
- durabilityMethod = flush;
+ if (shouldHSync.isEmpty()) {
+ shouldHSync = Optional.of(Boolean.FALSE);
}
break;
}
@@ -196,15 +196,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
long start = System.currentTimeMillis();
try {
- if (durabilityMethod != null) {
- durabilityMethod.invoke(logFile);
- if (durabilityMethod == sync) {
+ if (shouldHSync.isPresent()) {
+ if (shouldHSync.get()) {
+ logFile.hsync();
syncCounter.incrementAndGet();
} else {
+ logFile.hflush();
flushCounter.incrementAndGet();
}
}
- } catch (Exception ex) {
+ } catch (IOException | RuntimeException ex) {
fail(work, ex, "synching");
}
long duration = System.currentTimeMillis() - start;
@@ -322,8 +323,6 @@ public class DfsLogger implements Comparable<DfsLogger> {
private final ServerResources conf;
private FSDataOutputStream logFile;
private DataOutputStream encryptingLogFile = null;
- private Method sync;
- private Method flush;
private String logPath;
private Daemon syncThread;
@@ -442,8 +441,6 @@ public class DfsLogger implements Comparable<DfsLogger> {
logFile = fs.createSyncable(logfilePath, 0, replication, blockSize);
else
logFile = fs.create(logfilePath, true, 0, replication, blockSize);
- sync = logFile.getClass().getMethod("hsync");
- flush = logFile.getClass().getMethod("hflush");
// check again that logfile can be sync'd
if (!fs.canSyncAndFlush(logfilePath)) {
diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java b/start/src/main/java/org/apache/accumulo/start/Main.java
index 76d3def..e147106 100644
--- a/start/src/main/java/org/apache/accumulo/start/Main.java
+++ b/start/src/main/java/org/apache/accumulo/start/Main.java
@@ -45,8 +45,7 @@ public class Main {
public static void main(final String[] args) {
try {
// Preload classes that cause a deadlock between the ServiceLoader and the DFSClient when
- // using
- // the VFSClassLoader with jars in HDFS.
+ // using the VFSClassLoader with jars in HDFS.
ClassLoader loader = getClassLoader();
Class<?> confClass = null;
try {