You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by jm...@apache.org on 2014/10/22 21:10:15 UTC
git commit: SLIDER-505 enable port range specification for AM
Repository: incubator-slider
Updated Branches:
refs/heads/develop e13d20e3a -> 6be4bfadc
SLIDER-505 enable port range specification for AM
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/6be4bfad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/6be4bfad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/6be4bfad
Branch: refs/heads/develop
Commit: 6be4bfadcec39f6912861277d74e0d8e0a46e603
Parents: e13d20e
Author: Jon Maron <jm...@hortonworks.com>
Authored: Wed Oct 22 15:09:54 2014 -0400
Committer: Jon Maron <jm...@hortonworks.com>
Committed: Wed Oct 22 15:09:54 2014 -0400
----------------------------------------------------------------------
.../org/apache/slider/common/SliderKeys.java | 2 +
.../apache/slider/common/tools/PortScanner.java | 90 +++++++++++++++++++
.../server/appmaster/SliderAppMaster.java | 30 ++++++-
.../slider/common/tools/TestPortScan.groovy | 91 +++++++++++++++++++-
4 files changed, 208 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index e75ec73..ae58ef3 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -182,4 +182,6 @@ public interface SliderKeys extends SliderXmlConfKeys {
*/
String AM_FILTER_NAME =
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer";
+
+ String KEY_AM_ALLOWED_PORT_RANGE = "slider.am.allowed.port.range";
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
new file mode 100644
index 0000000..0f4cfbc
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.common.tools;
+
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.core.exceptions.SliderException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class PortScanner {
+ private static Pattern NUMBER_RANGE = Pattern.compile("^(\\d+)\\s*-\\s*(\\d+)$");
+ private static Pattern SINGLE_NUMBER = Pattern.compile("^\\d+$");
+
+ private List<Integer> remainingPortsToCheck;
+
+ public PortScanner() {
+ }
+
+ public void setPortRange(String input) {
+ // first split based on commas
+ Set<Integer> inputPorts= new TreeSet<Integer>();
+ String[] ranges = input.split(",");
+ for ( String range : ranges ) {
+ Matcher m = SINGLE_NUMBER.matcher(range.trim());
+ if (m.find()) {
+ inputPorts.add(Integer.parseInt(m.group()));
+ } else {
+ m = NUMBER_RANGE.matcher(range.trim());
+ if (m.find()) {
+ String[] boundaryValues = m.group(0).split("-");
+ int start = Integer.parseInt(boundaryValues[0].trim());
+ int end = Integer.parseInt(boundaryValues[1].trim());
+ for (int i = start; i < end + 1; i++) {
+ inputPorts.add(i);
+ }
+ }
+ }
+ }
+ this.remainingPortsToCheck = new ArrayList<Integer>(inputPorts);
+ }
+
+ public List<Integer> getRemainingPortsToCheck() {
+ return remainingPortsToCheck;
+ }
+
+ public int getAvailablePort () throws SliderException{
+ boolean found = false;
+ int availablePort = -1;
+ Iterator<Integer> portsToCheck = this.remainingPortsToCheck.iterator();
+ while (portsToCheck.hasNext() && !found) {
+ int portToCheck = portsToCheck.next();
+ found = SliderUtils.isPortAvailable(portToCheck);
+ if (found) {
+ availablePort = portToCheck;
+ portsToCheck.remove();
+ }
+ }
+
+ if (availablePort < 0) {
+ throw new SliderException(SliderExitCodes.EXIT_BAD_CONFIGURATION,
+ "No available ports found in configured range {}",
+ remainingPortsToCheck);
+ }
+
+ return availablePort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index d696f45..e7fa109 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -88,6 +88,7 @@ import org.apache.slider.common.params.SliderAMArgs;
import org.apache.slider.common.params.SliderAMCreateAction;
import org.apache.slider.common.params.SliderActions;
import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.PortScanner;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
@@ -375,6 +376,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
private YarnRegistryViewForProviders yarnRegistryOperations;
private FsDelegationTokenManager fsDelegationTokenManager;
private RegisterApplicationMasterResponse amRegistrationData;
+ private PortScanner portScanner;
/**
* Service Constructor
@@ -656,7 +658,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
//bring up the Slider RPC service
- startSliderRPCServer();
+ startSliderRPCServer(instanceDefinition);
rpcServiceAddress = rpcService.getConnectAddress();
appMasterHostname = rpcServiceAddress.getHostName();
@@ -690,6 +692,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
startAgentWebApp(appInformation, serviceConf);
+ int port = getPortToRequest(instanceDefinition);
+
webApp = new SliderAMWebApp(registryOperations);
WebApps.$for(SliderAMWebApp.BASE_PATH, WebAppApi.class,
new WebAppApiImpl(this,
@@ -698,6 +702,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
certificateManager, registryOperations),
RestPaths.WS_CONTEXT)
.withHttpPolicy(serviceConf, HttpConfig.Policy.HTTP_ONLY)
+ .at(port)
.start(webApp);
String scheme = WebAppUtils.HTTP_PREFIX;
appMasterTrackingUrl = scheme + appMasterHostname + ":" + webApp.port();
@@ -900,6 +905,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
return finish();
}
+ private int getPortToRequest(AggregateConf instanceDefinition)
+ throws SliderException {
+ int portToRequest = 0;
+ String portRange = instanceDefinition.
+ getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM)
+ .getOption(SliderKeys.KEY_AM_ALLOWED_PORT_RANGE , "0");
+ if (!"0".equals(portRange)) {
+ if (portScanner == null) {
+ portScanner = new PortScanner();
+ portScanner.setPortRange(portRange);
+ }
+ portToRequest = portScanner.getAvailablePort();
+ }
+
+ return portToRequest;
+ }
+
private void uploadServerCertForLocalization(String clustername,
SliderFileSystem fs)
throws IOException {
@@ -1334,7 +1356,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Start the slider RPC server
*/
- private void startSliderRPCServer() throws IOException, BadConfigException {
+ private void startSliderRPCServer(AggregateConf instanceDefinition)
+ throws IOException, SliderException {
// verify that if the cluster is authed, the ACLs are set.
boolean authorization = getConfig().getBoolean(
@@ -1352,9 +1375,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
.newReflectiveBlockingService(
protobufRelay);
+ int port = getPortToRequest(instanceDefinition);
rpcService =
new WorkflowRpcService("SliderRPC", RpcBinder.createProtobufServer(
- new InetSocketAddress("0.0.0.0", 0),
+ new InetSocketAddress("0.0.0.0", port),
getConfig(),
secretManager,
NUM_RPC_HANDLERS,
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
index 49bd58e..f009e25 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
@@ -18,11 +18,11 @@
package org.apache.slider.common.tools
-import groovy.transform.CompileStatic
+import org.apache.slider.core.exceptions.SliderException
import org.junit.Test
-@CompileStatic
class TestPortScan {
+ final shouldFail = new GroovyTestCase().&shouldFail
@Test
public void testScanPorts() throws Throwable {
@@ -38,4 +38,91 @@ class TestPortScan {
server.close()
}
}
+
+ @Test
+ public void testRequestedPortsLogic() throws Throwable {
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("5,6,8-10, 11,14 ,20 - 22")
+ List<Integer> ports = portScanner.remainingPortsToCheck
+ def expectedPorts = [5,6,8,9,10,11,14,20,21,22]
+ assert ports == expectedPorts
+ }
+
+ @Test
+ public void testRequestedPortsOutOfOrder() throws Throwable {
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("8-10,5,6, 11,20 - 22, 14 ")
+ List<Integer> ports = portScanner.remainingPortsToCheck
+ def expectedPorts = [5,6,8,9,10,11,14,20,21,22]
+ assert ports == expectedPorts
+ }
+
+ @Test
+ public void testFindAvailablePortInRange() throws Throwable {
+ ServerSocket server = new ServerSocket(0)
+ try {
+ int serverPort = server.getLocalPort()
+
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3))
+ int port = portScanner.availablePort
+ assert port != serverPort
+ assert port >= serverPort -1 && port <= serverPort + 3
+ } finally {
+ server.close()
+ }
+ }
+
+ @Test
+ public void testFindAvailablePortInList() throws Throwable {
+ ServerSocket server = new ServerSocket(0)
+ try {
+ int serverPort = server.getLocalPort()
+
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("" + (serverPort-1) + ", " + (serverPort + 1))
+ int port = portScanner.availablePort
+ assert port != serverPort
+ assert port == serverPort -1 || port == serverPort + 1
+ } finally {
+ server.close()
+ }
+ }
+
+ @Test
+ public void testNoAvailablePorts() throws Throwable {
+ ServerSocket server1 = new ServerSocket(0)
+ ServerSocket server2 = new ServerSocket(0)
+ try {
+ int serverPort1 = server1.getLocalPort()
+ int serverPort2 = server2.getLocalPort()
+
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("" + serverPort1+ ", " + serverPort2)
+ shouldFail(SliderException) {
+ portScanner.availablePort
+ }
+ } finally {
+ server1.close()
+ server2.close()
+ }
+ }
+
+ @Test
+ public void testPortRemovedFromRange() throws Throwable {
+ ServerSocket server = new ServerSocket(0)
+ try {
+ int serverPort = server.getLocalPort()
+
+ PortScanner portScanner = new PortScanner()
+ portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3))
+ int port = portScanner.availablePort
+ assert port != serverPort
+ assert port >= serverPort -1 && port <= serverPort + 3
+ def isPortInList = port in portScanner.remainingPortsToCheck
+ assert !isPortInList
+ } finally {
+ server.close()
+ }
+ }
}