You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/04/02 15:53:24 UTC
[accumulo] branch master updated: Skip restricted ports in TServer
port search (#1575)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 72e2794 Skip restricted ports in TServer port search (#1575)
72e2794 is described below
commit 72e27947829d5d29b513902a1391a1b9eeb77c9a
Author: Arvind Shyamsundar <ar...@apache.org>
AuthorDate: Thu Apr 2 08:53:17 2020 -0700
Skip restricted ports in TServer port search (#1575)
* Skip restricted ports in TServer port search
Fixes #1573. Includes debug logging for ports being skipped.
* Use Java Streams and add Unit Test
* Using streams to make the reserved port enumeration code more declarative
* Added a comprehensive unit test to check that reserved ports are
skipped during TServer startup as part of port search
---
.../apache/accumulo/server/rpc/TServerUtils.java | 33 ++++++++++-
.../accumulo/server/util/TServerUtilsTest.java | 67 ++++++++++++++++++++++
2 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 369eeb5..d07972d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -27,20 +27,25 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import javax.net.ssl.SSLServerSocket;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.PropertyType;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.ServerContext;
@@ -94,6 +99,21 @@ public class TServerUtils {
}
/**
+ *
+ * @param config
+ * Accumulo configuration
+ * @return A Map object with reserved port numbers as keys and Property objects as values
+ */
+
+ public static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config) {
+ return EnumSet.allOf(Property.class).stream()
+ .filter(p -> p.getType() == PropertyType.PORT && p != Property.TSERV_CLIENTPORT)
+ .flatMap(rp -> {
+ return Arrays.stream(config.getPort(rp)).mapToObj(portNum -> new Pair<>(portNum, rp));
+ }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+ }
+
+ /**
* Start a server, at the given port, or higher, if that port is not available.
*
* @param service
@@ -155,7 +175,8 @@ public class TServerUtils {
processor = updateSaslProcessor(serverType, processor);
}
- // create the TimedProcessor outside the port search loop so we don't try to register the same
+ // create the TimedProcessor outside the port search loop so we don't try to
+ // register the same
// metrics mbean more than once
TimedProcessor timedProcessor =
new TimedProcessor(metricsSystem, config, processor, serverName, threadName);
@@ -168,10 +189,20 @@ public class TServerUtils {
addresses);
} catch (TTransportException e) {
if (portSearch) {
+ // Build a list of reserved ports - as identified by properties of type PropertyType.PORT
+ Map<Integer,Property> reservedPorts = getReservedPorts(config);
+
HostAndPort last = addresses[addresses.length - 1];
// Attempt to allocate a port outside of the specified port property
// Search sequentially over the next 1000 ports
for (int port = last.getPort() + 1; port < last.getPort() + 1001; port++) {
+ if (reservedPorts.keySet().contains(port)) {
+ log.debug("During port search, skipping reserved port {} - property {} ({})", port,
+ reservedPorts.get(port).getKey(), reservedPorts.get(port).getDescription());
+
+ continue;
+ }
+
if (port > 65535) {
break;
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index 8f8f587..21addaa 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -233,6 +234,72 @@ public class TServerUtilsTest {
}
}
+ @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing")
+ @Test
+ public void testStartServerNonDefaultPorts() throws Exception {
+ TServer server = null;
+
+ // This test finds 6 free ports in more-or-less a contiguous way and then
+ // uses those port numbers to Accumulo services in the below (ascending) sequence
+ // 0. TServer default client port (this test binds to this port to force a port search)
+ // 1. GC
+ // 2. Master
+ // 3. Monitor
+ // 4. Master Replication Coordinator
+ // 5. One free port - this is the one that we expect the TServer to finally use
+ int[] ports = findTwoFreeSequentialPorts(1024);
+ int tserverDefaultPort = ports[0];
+ ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_CLIENTPORT,
+ Integer.toString(tserverDefaultPort));
+ int gcPort = ports[1];
+ ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.GC_PORT,
+ Integer.toString(gcPort));
+
+ ports = findTwoFreeSequentialPorts(gcPort + 1);
+ int masterPort = ports[0];
+ ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MASTER_CLIENTPORT,
+ Integer.toString(masterPort));
+ int monitorPort = ports[1];
+ ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.MONITOR_PORT,
+ Integer.toString(monitorPort));
+
+ ports = findTwoFreeSequentialPorts(monitorPort + 1);
+ int masterReplCoordPort = ports[0];
+ ((ConfigurationCopy) factory.getSystemConfiguration())
+ .set(Property.MASTER_REPLICATION_COORDINATOR_PORT, Integer.toString(masterReplCoordPort));
+ int tserverFinalPort = ports[1];
+
+ ((ConfigurationCopy) factory.getSystemConfiguration()).set(Property.TSERV_PORTSEARCH, "true");
+
+ // Ensure that the TServer client port we set above is NOT in the reserved ports
+ Map<Integer,Property> reservedPorts =
+ TServerUtils.getReservedPorts(factory.getSystemConfiguration());
+ assertTrue(!reservedPorts.keySet().contains(tserverDefaultPort));
+
+ // Ensure that all the ports we assigned (GC, Master, Monitor) are included in the reserved
+ // ports as returned by TServerUtils
+ assertTrue(reservedPorts.keySet().contains(gcPort));
+ assertTrue(reservedPorts.keySet().contains(masterPort));
+ assertTrue(reservedPorts.keySet().contains(monitorPort));
+ assertTrue(reservedPorts.keySet().contains(masterReplCoordPort));
+
+ InetAddress addr = InetAddress.getByName("localhost");
+ try (ServerSocket s = new ServerSocket(tserverDefaultPort, 50, addr)) {
+ ServerAddress address = startServer();
+ assertNotNull(address);
+ server = address.getServer();
+ assertNotNull(server);
+
+ // Finally ensure that the TServer is using the last port (i.e. port search worked)
+ assertTrue(address.getAddress().getPort() == tserverFinalPort);
+ } finally {
+ if (null != server) {
+ TServerUtils.stopTServer(server);
+ }
+
+ }
+ }
+
@Test
public void testStartServerPortRange() throws Exception {
TServer server = null;