You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by jm...@apache.org on 2014/10/22 21:10:15 UTC

git commit: SLIDER-505 enable port range specification for AM

Repository: incubator-slider
Updated Branches:
  refs/heads/develop e13d20e3a -> 6be4bfadc


SLIDER-505 enable port range specification for AM


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/6be4bfad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/6be4bfad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/6be4bfad

Branch: refs/heads/develop
Commit: 6be4bfadcec39f6912861277d74e0d8e0a46e603
Parents: e13d20e
Author: Jon Maron <jm...@hortonworks.com>
Authored: Wed Oct 22 15:09:54 2014 -0400
Committer: Jon Maron <jm...@hortonworks.com>
Committed: Wed Oct 22 15:09:54 2014 -0400

----------------------------------------------------------------------
 .../org/apache/slider/common/SliderKeys.java    |  2 +
 .../apache/slider/common/tools/PortScanner.java | 90 +++++++++++++++++++
 .../server/appmaster/SliderAppMaster.java       | 30 ++++++-
 .../slider/common/tools/TestPortScan.groovy     | 91 +++++++++++++++++++-
 4 files changed, 208 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index e75ec73..ae58ef3 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -182,4 +182,6 @@ public interface SliderKeys extends SliderXmlConfKeys {
    */
   String AM_FILTER_NAME =
       "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer";
+
+  String KEY_AM_ALLOWED_PORT_RANGE = "slider.am.allowed.port.range";
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
new file mode 100644
index 0000000..0f4cfbc
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.common.tools;
+
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.core.exceptions.SliderException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class PortScanner {
+  private static Pattern NUMBER_RANGE = Pattern.compile("^(\\d+)\\s*-\\s*(\\d+)$");
+  private static Pattern SINGLE_NUMBER = Pattern.compile("^\\d+$");
+
+  private List<Integer> remainingPortsToCheck;
+
+  public PortScanner() {
+  }
+
+  public void setPortRange(String input) {
+    // first split based on commas
+    Set<Integer> inputPorts= new TreeSet<Integer>();
+    String[] ranges = input.split(",");
+    for ( String range : ranges ) {
+      Matcher m = SINGLE_NUMBER.matcher(range.trim());
+      if (m.find()) {
+        inputPorts.add(Integer.parseInt(m.group()));
+      } else {
+        m = NUMBER_RANGE.matcher(range.trim());
+        if (m.find()) {
+          String[] boundaryValues = m.group(0).split("-");
+          int start = Integer.parseInt(boundaryValues[0].trim());
+          int end = Integer.parseInt(boundaryValues[1].trim());
+          for (int i = start; i < end + 1; i++) {
+            inputPorts.add(i);
+          }
+        }
+      }
+    }
+    this.remainingPortsToCheck = new ArrayList<Integer>(inputPorts);
+  }
+
+  public List<Integer> getRemainingPortsToCheck() {
+    return remainingPortsToCheck;
+  }
+
+  public int getAvailablePort () throws SliderException{
+    boolean found = false;
+    int availablePort = -1;
+    Iterator<Integer> portsToCheck = this.remainingPortsToCheck.iterator();
+    while (portsToCheck.hasNext() && !found) {
+      int portToCheck = portsToCheck.next();
+      found = SliderUtils.isPortAvailable(portToCheck);
+      if (found) {
+        availablePort = portToCheck;
+        portsToCheck.remove();
+      }
+    }
+
+    if (availablePort < 0) {
+      throw new SliderException(SliderExitCodes.EXIT_BAD_CONFIGURATION,
+        "No available ports found in configured range {}",
+        remainingPortsToCheck);
+    }
+
+    return availablePort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index d696f45..e7fa109 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -88,6 +88,7 @@ import org.apache.slider.common.params.SliderAMArgs;
 import org.apache.slider.common.params.SliderAMCreateAction;
 import org.apache.slider.common.params.SliderActions;
 import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.PortScanner;
 import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.common.tools.SliderUtils;
 import org.apache.slider.common.tools.SliderVersionInfo;
@@ -375,6 +376,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   private YarnRegistryViewForProviders yarnRegistryOperations;
   private FsDelegationTokenManager fsDelegationTokenManager;
   private RegisterApplicationMasterResponse amRegistrationData;
+  private PortScanner portScanner;
 
   /**
    * Service Constructor
@@ -656,7 +658,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
 
       //bring up the Slider RPC service
-      startSliderRPCServer();
+      startSliderRPCServer(instanceDefinition);
 
       rpcServiceAddress = rpcService.getConnectAddress();
       appMasterHostname = rpcServiceAddress.getHostName();
@@ -690,6 +692,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
 
       startAgentWebApp(appInformation, serviceConf);
 
+      int port = getPortToRequest(instanceDefinition);
+
       webApp = new SliderAMWebApp(registryOperations);
       WebApps.$for(SliderAMWebApp.BASE_PATH, WebAppApi.class,
                    new WebAppApiImpl(this,
@@ -698,6 +702,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
                                      certificateManager, registryOperations),
                    RestPaths.WS_CONTEXT)
                       .withHttpPolicy(serviceConf, HttpConfig.Policy.HTTP_ONLY)
+                      .at(port)
                       .start(webApp);
       String scheme = WebAppUtils.HTTP_PREFIX;
       appMasterTrackingUrl = scheme  + appMasterHostname + ":" + webApp.port();
@@ -900,6 +905,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     return finish();
   }
 
+  private int getPortToRequest(AggregateConf instanceDefinition)
+      throws SliderException {
+    int portToRequest = 0;
+    String portRange = instanceDefinition.
+        getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM)
+        .getOption(SliderKeys.KEY_AM_ALLOWED_PORT_RANGE , "0");
+    if (!"0".equals(portRange)) {
+      if (portScanner == null) {
+        portScanner = new PortScanner();
+        portScanner.setPortRange(portRange);
+      }
+      portToRequest = portScanner.getAvailablePort();
+    }
+
+    return portToRequest;
+  }
+
   private void uploadServerCertForLocalization(String clustername,
                                                SliderFileSystem fs)
       throws IOException {
@@ -1334,7 +1356,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /**
    * Start the slider RPC server
    */
-  private void startSliderRPCServer() throws IOException, BadConfigException {
+  private void startSliderRPCServer(AggregateConf instanceDefinition)
+      throws IOException, SliderException {
 
     // verify that if the cluster is authed, the ACLs are set.
     boolean authorization = getConfig().getBoolean(
@@ -1352,9 +1375,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
         .newReflectiveBlockingService(
             protobufRelay);
 
+    int port = getPortToRequest(instanceDefinition);
     rpcService =
         new WorkflowRpcService("SliderRPC", RpcBinder.createProtobufServer(
-            new InetSocketAddress("0.0.0.0", 0),
+            new InetSocketAddress("0.0.0.0", port),
             getConfig(),
             secretManager,
             NUM_RPC_HANDLERS,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
index 49bd58e..f009e25 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy
@@ -18,11 +18,11 @@
 
 package org.apache.slider.common.tools
 
-import groovy.transform.CompileStatic
+import org.apache.slider.core.exceptions.SliderException
 import org.junit.Test
 
-@CompileStatic
 class TestPortScan {
+  final shouldFail = new GroovyTestCase().&shouldFail
 
   @Test
   public void testScanPorts() throws Throwable {
@@ -38,4 +38,91 @@ class TestPortScan {
       server.close()
     }
   }
+
+  @Test
+  public void testRequestedPortsLogic() throws Throwable {
+    PortScanner portScanner = new PortScanner()
+    portScanner.setPortRange("5,6,8-10, 11,14 ,20 - 22")
+    List<Integer> ports = portScanner.remainingPortsToCheck
+    def expectedPorts = [5,6,8,9,10,11,14,20,21,22]
+    assert ports == expectedPorts
+  }
+
+  @Test
+  public void testRequestedPortsOutOfOrder() throws Throwable {
+    PortScanner portScanner = new PortScanner()
+    portScanner.setPortRange("8-10,5,6, 11,20 - 22, 14 ")
+    List<Integer> ports = portScanner.remainingPortsToCheck
+    def expectedPorts = [5,6,8,9,10,11,14,20,21,22]
+    assert ports == expectedPorts
+  }
+
+  @Test
+  public void testFindAvailablePortInRange() throws Throwable {
+    ServerSocket server = new ServerSocket(0)
+    try {
+      int serverPort = server.getLocalPort()
+
+      PortScanner portScanner = new PortScanner()
+      portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3))
+      int port = portScanner.availablePort
+      assert port != serverPort
+      assert port >= serverPort -1 && port <= serverPort + 3
+    } finally {
+      server.close()
+    }
+  }
+
+  @Test
+  public void testFindAvailablePortInList() throws Throwable {
+    ServerSocket server = new ServerSocket(0)
+    try {
+      int serverPort = server.getLocalPort()
+
+      PortScanner portScanner = new PortScanner()
+      portScanner.setPortRange("" + (serverPort-1) + ", " + (serverPort + 1))
+      int port = portScanner.availablePort
+      assert port != serverPort
+      assert port == serverPort -1 || port == serverPort + 1
+    } finally {
+      server.close()
+    }
+  }
+
+  @Test
+  public void testNoAvailablePorts() throws Throwable {
+    ServerSocket server1 = new ServerSocket(0)
+    ServerSocket server2 = new ServerSocket(0)
+    try {
+      int serverPort1 = server1.getLocalPort()
+      int serverPort2 = server2.getLocalPort()
+
+      PortScanner portScanner = new PortScanner()
+      portScanner.setPortRange("" + serverPort1+ ", " + serverPort2)
+      shouldFail(SliderException) {
+        portScanner.availablePort
+      }
+    } finally {
+      server1.close()
+      server2.close()
+    }
+  }
+
+  @Test
+  public void testPortRemovedFromRange() throws Throwable {
+    ServerSocket server = new ServerSocket(0)
+    try {
+      int serverPort = server.getLocalPort()
+
+      PortScanner portScanner = new PortScanner()
+      portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3))
+      int port = portScanner.availablePort
+      assert port != serverPort
+      assert port >= serverPort -1 && port <= serverPort + 3
+      def isPortInList = port in portScanner.remainingPortsToCheck
+      assert !isPortInList
+    } finally {
+      server.close()
+    }
+  }
 }