You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:31 UTC
[50/50] [abbrv] incubator-livy git commit: Livy:337 Binding RPCServer
to user provided port and not random port (#334)
Livy:337 Binding RPCServer to user provided port and not random port (#334)
* Code changes in RPCserver for user provided port
* Indentation Changes
* Indentation Changes
* Indentation Changes
* Indentation Changes
* Configuring Port Range
* Documentation Changed
* launcher.port.range will take care of launching RPC
* Checkstyle changes
* Checkstyle changes
* Dummy push
* Code changes
* Changed BindException Handling to SocketException Handling
* Changed Import Order
* Code changes to increase port range
* Set Port isConntect to true
* Indentation Changes & port range in livy-client.conf.template
* Indentation changes
* Changed visibilty of method private
* Indentation Changes
* Indenetation Changes
* Unit test case to test port range
* Checkstyle changes
* Unit test case for port range
* Added comment for Port Range Configuration and increase port range for unit test case
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/9ae24d08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/9ae24d08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/9ae24d08
Branch: refs/heads/master
Commit: 9ae24d08738652ba5fd817780711d01b110d74a9
Parents: 02eef9a
Author: pralabhkumar <pr...@gmail.com>
Authored: Thu Jun 8 13:22:25 2017 +0530
Committer: Jeff Zhang <zj...@gmail.com>
Committed: Thu Jun 8 15:52:25 2017 +0800
----------------------------------------------------------------------
conf/livy-client.conf.template | 5 +-
.../java/com/cloudera/livy/rsc/RSCConf.java | 4 +-
.../com/cloudera/livy/rsc/rpc/RpcServer.java | 86 ++++++++++++++++----
.../java/com/cloudera/livy/rsc/rpc/TestRpc.java | 27 ++++++
4 files changed, 104 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/conf/livy-client.conf.template
----------------------------------------------------------------------
diff --git a/conf/livy-client.conf.template b/conf/livy-client.conf.template
index 2a92b57..06ad653 100644
--- a/conf/livy-client.conf.template
+++ b/conf/livy-client.conf.template
@@ -55,7 +55,8 @@
# Address for the RSC driver to connect back with it's connection info.
# livy.rsc.launcher.address =
-# livy.rsc.launcher.port = -1
+# Port Range on which RPC will launch . Port range in inclusive of start and end port .
+# livy.rsc.launcher.port.range = 10000~10110
# How long will the RSC wait for a connection for a Livy server before shutting itself down.
# livy.rsc.server.idle-timeout = 10m
@@ -83,4 +84,4 @@
# livy.rsc.job-cancel.timeout = 30s
# Number of statements kept in driver's memory
-# livy.rsc.retained-statements = 100
\ No newline at end of file
+# livy.rsc.retained-statements = 100
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
index d1b8b39..afd935d 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
@@ -51,8 +51,10 @@ public class RSCConf extends ClientConf<RSCConf> {
// Address for the RSC driver to connect back with it's connection info.
LAUNCHER_ADDRESS("launcher.address", null),
+ LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
+ // Setting up of this propety by user has no benefit. It is currently being used
+ // to pass port information from ContextLauncher to RSCDriver
LAUNCHER_PORT("launcher.port", -1),
-
// How long will the RSC wait for a connection for a Livy server before shutting itself down.
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
index 1d3e6c5..44db976 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
@@ -19,7 +19,10 @@ package com.cloudera.livy.rsc.rpc;
import java.io.Closeable;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -61,18 +64,78 @@ public class RpcServer implements Closeable {
private static final SecureRandom RND = new SecureRandom();
private final String address;
- private final Channel channel;
+ private Channel channel;
private final EventLoopGroup group;
private final int port;
private final ConcurrentMap<String, ClientInfo> pendingClients;
private final RSCConf config;
-
+ private final String portRange;
+ private static enum PortRangeSchema{START_PORT, END_PORT, MAX};
+ private final String PORT_DELIMITER = "~";
+ /**
+ * Creating RPC Server
+ * @param lconf
+ * @throws IOException
+ * @throws InterruptedException
+ */
public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
this.config = lconf;
+ this.portRange = config.get(LAUNCHER_PORT_RANGE);
this.group = new NioEventLoopGroup(
- this.config.getInt(RPC_MAX_THREADS),
- Utils.newDaemonThreadFactory("RPC-Handler-%d"));
- this.channel = new ServerBootstrap()
+ this.config.getInt(RPC_MAX_THREADS),
+ Utils.newDaemonThreadFactory("RPC-Handler-%d"));
+ int [] portData = getPortNumberAndRange();
+ int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()];
+ int endPort = portData[PortRangeSchema.END_PORT.ordinal()];
+ boolean isContected = false;
+ for(int tries = startingPortNumber ; tries<=endPort ; tries++){
+ try {
+ this.channel = getChannel(tries);
+ isContected = true;
+ break;
+ } catch(SocketException e){
+ LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
+ }
+ }
+ if(!isContected) {
+ throw new IOException("Unable to connect to provided ports " + this.portRange);
+ }
+ this.port = ((InetSocketAddress) channel.localAddress()).getPort();
+ this.pendingClients = new ConcurrentHashMap<>();
+ LOG.info("Connected to the port " + this.port);
+ String address = config.get(RPC_SERVER_ADDRESS);
+ if (address == null) {
+ address = config.findLocalAddress();
+ }
+ this.address = address;
+ }
+
+ /**
+ * Get Port Numbers
+ */
+ private int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException,
+ NumberFormatException {
+ String[] split = this.portRange.split(PORT_DELIMITER);
+ int [] portRange = new int [PortRangeSchema.MAX.ordinal()];
+ try {
+ portRange[PortRangeSchema.START_PORT.ordinal()] =
+ Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]);
+ portRange[PortRangeSchema.END_PORT.ordinal()] =
+ Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]);
+ } catch(ArrayIndexOutOfBoundsException e) {
+ LOG.error("Port Range format is not correct " + this.portRange);
+ throw e;
+ } catch(NumberFormatException e) {
+ LOG.error("Port are not in numeric format " + this.portRange);
+ throw e;
+ }
+ return portRange;
+ }
+ /**
+ * @throws InterruptedException
+ **/
+ private Channel getChannel(int portNumber) throws BindException, InterruptedException {
+ Channel channel = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@@ -97,19 +160,11 @@ public class RpcServer implements Closeable {
.option(ChannelOption.SO_BACKLOG, 1)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
- .bind(0)
+ .bind(portNumber)
.sync()
.channel();
- this.port = ((InetSocketAddress) channel.localAddress()).getPort();
- this.pendingClients = new ConcurrentHashMap<>();
-
- String address = config.get(RPC_SERVER_ADDRESS);
- if (address == null) {
- address = config.findLocalAddress();
- }
- this.address = address;
+ return channel;
}
-
/**
* Tells the RPC server to expect connections from clients.
*
@@ -310,3 +365,4 @@ public class RpcServer implements Closeable {
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/9ae24d08/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
index cf19fee..48abe94 100644
--- a/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
+++ b/rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java
@@ -18,6 +18,8 @@
package com.cloudera.livy.rsc.rpc;
import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
@@ -186,6 +188,31 @@ public class TestRpc {
assertEquals(outbound.message, reply.message);
}
+ @Test
+ public void testPortRange() throws Exception {
+ String portRange = "a~b";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ try {
+ autoClose(new RpcServer(emptyConfig));
+ } catch (Exception ee) {
+ assertTrue(ee instanceof NumberFormatException);
+ }
+ portRange = "11000";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ try {
+ autoClose(new RpcServer(emptyConfig));
+ } catch (Exception ee) {
+ assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
+ }
+ portRange = "11000~11110";
+ emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
+ String [] portRangeData = portRange.split("~");
+ int startPort = Integer.parseInt(portRangeData[0]);
+ int endPort = Integer.parseInt(portRangeData[1]);
+ RpcServer server = autoClose(new RpcServer(emptyConfig));
+ assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
+ }
+
private void transfer(Rpc serverRpc, Rpc clientRpc) {
EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();