You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/04/02 15:53:24 UTC

[accumulo] branch master updated: Skip restricted ports in TServer port search (#1575)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 72e2794  Skip restricted ports in TServer port search (#1575)
72e2794 is described below

commit 72e27947829d5d29b513902a1391a1b9eeb77c9a
Author: Arvind Shyamsundar <ar...@apache.org>
AuthorDate: Thu Apr 2 08:53:17 2020 -0700

    Skip restricted ports in TServer port search (#1575)
    
    * Skip restricted ports in TServer port search
    
    Fixes #1573. Includes debug logging for ports being skipped.
    
    * Use Java Streams and add Unit Test
    
    * Using streams to make the reserved port enumeration code more declarative
    * Added a comprehensive unit test to check that reserved ports are
      skipped during TServer startup as part of port search
---
 .../apache/accumulo/server/rpc/TServerUtils.java   | 33 ++++++++++-
 .../accumulo/server/util/TServerUtilsTest.java     | 67 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 369eeb5..d07972d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -27,20 +27,25 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 
 import javax.net.ssl.SSLServerSocket;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.PropertyType;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.server.ServerContext;
@@ -94,6 +99,21 @@ public class TServerUtils {
   }
 
   /**
+   *
+   * @param config
+   *          Accumulo configuration
+   * @return A Map object with reserved port numbers as keys and Property objects as values
+   */
+
+  public static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config) {
+    return EnumSet.allOf(Property.class).stream()
+        .filter(p -> p.getType() == PropertyType.PORT && p != Property.TSERV_CLIENTPORT)
+        .flatMap(rp -> {
+          return Arrays.stream(config.getPort(rp)).mapToObj(portNum -> new Pair<>(portNum, rp));
+        }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+  }
+
+  /**
    * Start a server, at the given port, or higher, if that port is not available.
    *
    * @param service
@@ -155,7 +175,8 @@ public class TServerUtils {
       processor = updateSaslProcessor(serverType, processor);
     }
 
-    // create the TimedProcessor outside the port search loop so we don't try to register the same
+    // create the TimedProcessor outside the port search loop so we don't try to
+    // register the same
     // metrics mbean more than once
     TimedProcessor timedProcessor =
         new TimedProcessor(metricsSystem, config, processor, serverName, threadName);
@@ -168,10 +189,20 @@ public class TServerUtils {
           addresses);
     } catch (TTransportException e) {
       if (portSearch) {
+        // Build a list of reserved ports - as identified by properties of type PropertyType.PORT
+        Map<Integer,Property> reservedPorts = getReservedPorts(config);
+
         HostAndPort last = addresses[addresses.length - 1];
         // Attempt to allocate a port outside of the specified port property
         // Search sequentially over the next 1000 ports
         for (int port = last.getPort() + 1; port < last.getPort() + 1001; port++) {
+          if (reservedPorts.keySet().contains(port)) {
+            log.debug("During port search, skipping reserved port {} - property {} ({})", port,
+                reservedPorts.get(port).getKey(), reservedPorts.get(port).getDescription());
+
+            continue;
+          }
+
           if (port > 65535) {
             break;
           }
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index 8f8f587..21addaa 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -233,6 +234,72 @@ public class TServerUtilsTest {
     }
   }
 
+  @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing")
+  @Test
+  public void testStartServerNonDefaultPorts() throws Exception {
+    TServer server = null;
+
+    // This test finds 6 free ports in more-or-less a contiguous way and then
+    // uses those port numbers to Accumulo services in the below (ascending) sequence
+    // 0. TServer default client port (this test binds to this port to force a port search)
+    // 1. GC
+    // 2. Master
+    // 3. Monitor
+    // 4. Master Replication Coordinator
+    // 5. One free port - this is the one that we expect the TServer to finally use
+    int[] ports = findTwoFreeSequentialPorts(1024);
+    int tserverDefaultPort = ports[0];
+    ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
+        Integer.toString(tserverDefaultPort));
+    int gcPort = ports[1];
+    ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.GC_PORT,
+        Integer.toString(gcPort));
+
+    ports = findTwoFreeSequentialPorts(gcPort + 1);
+    int masterPort = ports[0];
+    ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MASTER_CLIENTPORT,
+        Integer.toString(masterPort));
+    int monitorPort = ports[1];
+    ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MONITOR_PORT,
+        Integer.toString(monitorPort));
+
+    ports = findTwoFreeSequentialPorts(monitorPort + 1);
+    int masterReplCoordPort = ports[0];
+    ((ConfigurationCopy) factory.getSystemConfiguration())
+        .set(Property.MASTER_REPLICATION_COORDINATOR_PORT, Integer.toString(masterReplCoordPort));
+    int tserverFinalPort = ports[1];
+
+    ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
+
+    // Ensure that the TServer client port we set above is NOT in the reserved ports
+    Map<Integer,Property> reservedPorts =
+        TServerUtils.getReservedPorts(factory.getSystemConfiguration());
+    assertTrue(!reservedPorts.keySet().contains(tserverDefaultPort));
+
+    // Ensure that all the ports we assigned (GC, Master, Monitor) are included in the reserved
+    // ports as returned by TServerUtils
+    assertTrue(reservedPorts.keySet().contains(gcPort));
+    assertTrue(reservedPorts.keySet().contains(masterPort));
+    assertTrue(reservedPorts.keySet().contains(monitorPort));
+    assertTrue(reservedPorts.keySet().contains(masterReplCoordPort));
+
+    InetAddress addr = InetAddress.getByName("localhost");
+    try (ServerSocket s = new ServerSocket(tserverDefaultPort, 50, addr)) {
+      ServerAddress address = startServer();
+      assertNotNull(address);
+      server = address.getServer();
+      assertNotNull(server);
+
+      // Finally ensure that the TServer is using the last port (i.e. port search worked)
+      assertTrue(address.getAddress().getPort() == tserverFinalPort);
+    } finally {
+      if (null != server) {
+        TServerUtils.stopTServer(server);
+      }
+
+    }
+  }
+
   @Test
   public void testStartServerPortRange() throws Exception {
     TServer server = null;