You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:31 UTC

[50/50] [abbrv] incubator-livy git commit: Livy:337 Binding RPCServer to user provided port and not random port (#334)

Livy:337 Binding RPCServer to user provided port and not random port (#334)

* Code changes in RPCserver for user provided port

* Indentation Changes

* Indentation Changes

* Indentation Changes

* Indentation Changes

* Configuring Port Range

* Documentation Changed

* launcher.port.range will take care of launching RPC

* Checkstyle changes

* Checkstyle changes

* Dummy push

* Code changes

* Changed BindException Handling to SocketException Handling

* Changed Import Order

* Code changes to increase port range

* Set Port isConntect to true

* Indentation Changes & port range in livy-client.conf.template

* Indentation changes

* Changed visibilty of method private

* Indentation Changes

* Indenetation Changes

* Unit test case to test port range

* Checkstyle changes

* Unit test case for port range

* Added comment for Port Range Configuration and increase port range for unit test case


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/9ae24d08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/9ae24d08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/9ae24d08

Branch: refs/heads/master
Commit: 9ae24d08738652ba5fd817780711d01b110d74a9
Parents: 02eef9a
Author: pralabhkumar <pr...@gmail.com>
Authored: Thu Jun 8 13:22:25 2017 +0530
Committer: Jeff Zhang <zj...@gmail.com>
Committed: Thu Jun 8 15:52:25 2017 +0800

----------------------------------------------------------------------
 conf/livy-client.conf.template                  |  5 +-
 .../java/com/cloudera/livy/rsc/RSCConf.java     |  4 +-
 .../com/cloudera/livy/rsc/rpc/RpcServer.java    | 86 ++++++++++++++++----
 .../java/com/cloudera/livy/rsc/rpc/TestRpc.java | 27 ++++++
 4 files changed, 104 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/conf/livy-client.conf.template
----------------------------------------------------------------------
diff --git a/conf/livy-client.conf.template b/conf/livy-client.conf.template
index 2a92b57..06ad653 100644
--- a/conf/livy-client.conf.template
+++ b/conf/livy-client.conf.template
@@ -55,7 +55,8 @@
 
 # Address for the RSC driver to connect back with it's connection info.
 # livy.rsc.launcher.address =
-# livy.rsc.launcher.port = -1
+# Port Range on which RPC will launch . Port range in inclusive of start and end port .
+# livy.rsc.launcher.port.range = 10000~10110
 
 # How long will the RSC wait for a connection for a Livy server before shutting itself down.
 # livy.rsc.server.idle-timeout = 10m
@@ -83,4 +84,4 @@
 # livy.rsc.job-cancel.timeout = 30s
 
 # Number of statements kept in driver's memory
-# livy.rsc.retained-statements = 100
\ No newline at end of file
+# livy.rsc.retained-statements = 100

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
index d1b8b39..afd935d 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
@@ -51,8 +51,10 @@ public class RSCConf extends ClientConf<RSCConf> {
 
     // Address for the RSC driver to connect back with it's connection info.
     LAUNCHER_ADDRESS("launcher.address", null),
+    LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
+    // Setting up of this propety by user has no benefit. It is currently being used
+    // to pass  port information from ContextLauncher to RSCDriver
     LAUNCHER_PORT("launcher.port", -1),
-
     // How long will the RSC wait for a connection for a Livy server before shutting itself down.
     SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
index 1d3e6c5..44db976 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
@@ -19,7 +19,10 @@ package com.cloudera.livy.rsc.rpc;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
 import java.security.SecureRandom;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -61,18 +64,78 @@ public class RpcServer implements Closeable {
   private static final SecureRandom RND = new SecureRandom();
 
   private final String address;
-  private final Channel channel;
+  private Channel channel;
   private final EventLoopGroup group;
   private final int port;
   private final ConcurrentMap<String, ClientInfo> pendingClients;
   private final RSCConf config;
-
+  private final String portRange;
+  private static enum PortRangeSchema{START_PORT, END_PORT, MAX};
+  private final String PORT_DELIMITER = "~";
+  /**
+   * Creating RPC Server
+   * @param lconf
+   * @throws IOException
+   * @throws InterruptedException
+   */
   public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
     this.config = lconf;
+    this.portRange = config.get(LAUNCHER_PORT_RANGE);
     this.group = new NioEventLoopGroup(
-        this.config.getInt(RPC_MAX_THREADS),
-        Utils.newDaemonThreadFactory("RPC-Handler-%d"));
-    this.channel = new ServerBootstrap()
+      this.config.getInt(RPC_MAX_THREADS),
+      Utils.newDaemonThreadFactory("RPC-Handler-%d"));
+    int [] portData = getPortNumberAndRange();
+    int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()];
+    int endPort = portData[PortRangeSchema.END_PORT.ordinal()];
+    boolean isContected = false;
+    for(int tries = startingPortNumber ; tries<=endPort ; tries++){
+      try {
+        this.channel = getChannel(tries);
+        isContected = true;
+        break;
+      } catch(SocketException e){
+        LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
+      }
+    }
+    if(!isContected) {
+      throw new IOException("Unable to connect to provided ports " + this.portRange);
+    }
+    this.port = ((InetSocketAddress) channel.localAddress()).getPort();
+    this.pendingClients = new ConcurrentHashMap<>();
+    LOG.info("Connected to the port " + this.port);
+    String address = config.get(RPC_SERVER_ADDRESS);
+    if (address == null) {
+      address = config.findLocalAddress();
+    }
+    this.address = address;
+  }
+
+  /**
+   * Get Port Numbers
+   */
+  private int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException,
+    NumberFormatException {
+    String[] split = this.portRange.split(PORT_DELIMITER);
+    int [] portRange = new int [PortRangeSchema.MAX.ordinal()];
+    try {
+      portRange[PortRangeSchema.START_PORT.ordinal()] =
+      Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]);
+      portRange[PortRangeSchema.END_PORT.ordinal()] =
+      Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]);
+    } catch(ArrayIndexOutOfBoundsException e) {
+      LOG.error("Port Range format is not correct " + this.portRange);
+      throw e;
+    } catch(NumberFormatException e) {
+      LOG.error("Port are not in numeric format " + this.portRange);
+      throw e;
+    }
+    return portRange;
+  }
+  /**
+   * @throws InterruptedException
+   **/
+  private Channel getChannel(int portNumber) throws BindException, InterruptedException {
+    Channel channel = new ServerBootstrap()
       .group(group)
       .channel(NioServerSocketChannel.class)
       .childHandler(new ChannelInitializer<SocketChannel>() {
@@ -97,19 +160,11 @@ public class RpcServer implements Closeable {
       .option(ChannelOption.SO_BACKLOG, 1)
       .option(ChannelOption.SO_REUSEADDR, true)
       .childOption(ChannelOption.SO_KEEPALIVE, true)
-      .bind(0)
+      .bind(portNumber)
       .sync()
       .channel();
-    this.port = ((InetSocketAddress) channel.localAddress()).getPort();
-    this.pendingClients = new ConcurrentHashMap<>();
-
-    String address = config.get(RPC_SERVER_ADDRESS);
-    if (address == null) {
-      address = config.findLocalAddress();
-    }
-    this.address = address;
+    return channel;
   }
-
   /**
    * Tells the RPC server to expect connections from clients.
    *
@@ -310,3 +365,4 @@ public class RpcServer implements Closeable {
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
index cf19fee..48abe94 100644
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
+++ b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
@@ -18,6 +18,8 @@
 package com.cloudera.livy.rsc.rpc;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
@@ -186,6 +188,31 @@ public class TestRpc {
     assertEquals(outbound.message, reply.message);
   }
 
+  @Test
+  public void testPortRange() throws Exception {
+    String portRange = "a~b";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    try {
+      autoClose(new RpcServer(emptyConfig));
+    } catch (Exception ee) {
+      assertTrue(ee instanceof NumberFormatException);
+    }
+    portRange = "11000";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    try {
+      autoClose(new RpcServer(emptyConfig));
+    } catch (Exception ee) {
+      assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
+    }
+    portRange = "11000~11110";
+    emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+    String [] portRangeData = portRange.split("~");
+    int startPort = Integer.parseInt(portRangeData[0]);
+    int endPort = Integer.parseInt(portRangeData[1]);
+    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
+  }
+
   private void transfer(Rpc serverRpc, Rpc clientRpc) {
     EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
     EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();