You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/04/05 14:20:11 UTC

[accumulo] branch master updated: Revert thrift server to custom_hs_ha (#1075)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 503f351  Revert thrift server to custom_hs_ha (#1075)
503f351 is described below

commit 503f351e0897b640d2256af22f7c22cbe73b7287
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Apr 5 10:20:06 2019 -0400

    Revert thrift server to custom_hs_ha (#1075)
    
    After the changes in #1059 some of the ITs were failing. The new thrift
    server was failing.  This change reverts to the previous thrift server
    type as the default while leaving the new type configurable.
---
 .../server/rpc/CustomThreadedSelectorServer.java   | 23 +++++++++++++---------
 .../apache/accumulo/server/rpc/TServerUtils.java   | 12 +++++------
 .../accumulo/server/rpc/ThriftServerType.java      |  5 ++---
 .../accumulo/server/rpc/ThriftServerTypeTest.java  |  2 +-
 4 files changed, 23 insertions(+), 19 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java
index c5a898d..00796f9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java
@@ -22,6 +22,7 @@ import java.net.Socket;
 import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
+import org.slf4j.LoggerFactory;
 
 public class CustomThreadedSelectorServer extends TThreadedSelectorServer {
 
@@ -50,17 +51,21 @@ public class CustomThreadedSelectorServer extends TThreadedSelectorServer {
   protected Runnable getRunnable(FrameBuffer frameBuffer) {
     return () -> {
 
-      TNonblockingTransport transport = getTransport(frameBuffer);
+      try {
+        TNonblockingTransport transport = getTransport(frameBuffer);
 
-      if (transport instanceof TNonblockingSocket) {
-        // This block of code makes the client address available to the server side code that
-        // executes a RPC. It is made available for informational purposes.
-        TNonblockingSocket tsock = (TNonblockingSocket) transport;
-        Socket sock = tsock.getSocketChannel().socket();
-        TServerUtils.clientAddress
-            .set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+        if (transport instanceof TNonblockingSocket) {
+          // This block of code makes the client address available to the server side code that
+          // executes a RPC. It is made available for informational purposes.
+          TNonblockingSocket tsock = (TNonblockingSocket) transport;
+          Socket sock = tsock.getSocketChannel().socket();
+          TServerUtils.clientAddress
+              .set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+        }
+      } catch (Exception e) {
+        LoggerFactory.getLogger(CustomThreadedSelectorServer.class)
+            .warn("Failed to get client address ", e);
       }
-
       frameBuffer.invoke();
     };
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index d112bb5..4541ea1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -607,18 +607,18 @@ public class TServerUtils {
             serverAddress = createBlockingServer(address, processor, protocolFactory,
                 maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
             break;
+          case THREADED_SELECTOR:
+            log.debug("Instantiating default, unsecure Threaded selector Thrift server");
+            serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
+                serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+            break;
           case CUSTOM_HS_HA:
             log.debug("Instantiating unsecure custom half-async Thrift server");
             serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
                 numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
             break;
-          case THREADED_SELECTOR: // Intentional passthrough -- Our custom wrapper around threaded
-                                  // selector is the default
           default:
-            log.debug("Instantiating default, unsecure Threaded selector Thrift server");
-            serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
-                serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
-            break;
+            throw new IllegalArgumentException("Unknown server type " + serverType);
         }
         break;
       } catch (TTransportException e) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
index 04f64e9..29ecc53 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
@@ -41,9 +41,8 @@ public enum ThriftServerType {
   }
 
   public static ThriftServerType get(String name) {
-    // Our custom HsHa server is the default (if none is provided)
     if (StringUtils.isBlank(name)) {
-      return THREADED_SELECTOR;
+      return getDefault();
     }
     return ThriftServerType.valueOf(name.trim().toUpperCase());
   }
@@ -54,6 +53,6 @@ public enum ThriftServerType {
   }
 
   public static ThriftServerType getDefault() {
-    return THREADED_SELECTOR;
+    return CUSTOM_HS_HA;
   }
 }
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
index 01bc513..708f70e 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
@@ -25,7 +25,7 @@ public class ThriftServerTypeTest {
 
   @Test
   public void testDefaultServer() {
-    assertEquals(ThriftServerType.THREADED_SELECTOR,
+    assertEquals(ThriftServerType.CUSTOM_HS_HA,
         ThriftServerType.get(Property.GENERAL_RPC_SERVER_TYPE.getDefaultValue()));
   }