You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by pe...@apache.org on 2018/02/23 13:52:10 UTC
incubator-crail git commit: Narpc: enable multiple core dispatcher
Repository: incubator-crail
Updated Branches:
refs/heads/master 7ea8753a6 -> d58abd828
Narpc: enable multiple core dispatcher
Enable multi core dispatcher if configured to use NaRPC (RPC and
Storage).
Fixes CRAIL-9 JIRA ticket
Close #5
Signed-off-by: Jonas Pfefferle <pe...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/d58abd82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/d58abd82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/d58abd82
Branch: refs/heads/master
Commit: d58abd828245815f450d2342ff5df6505ea704e6
Parents: 7ea8753
Author: Patrick Stuedi <st...@zurich.ibm.com>
Authored: Fri Feb 23 13:59:02 2018 +0100
Committer: Jonas Pfefferle <pe...@apache.org>
Committed: Fri Feb 23 14:51:18 2018 +0100
----------------------------------------------------------------------
pom.xml | 2 +-
.../crail/namenode/rpc/tcp/TcpNameNodeServer.java | 2 +-
.../crail/namenode/rpc/tcp/TcpRpcConstants.java | 7 +++++++
.../crail/storage/tcp/TcpStorageConstants.java | 15 +++++++++++++++
.../apache/crail/storage/tcp/TcpStorageServer.java | 2 +-
5 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eb64c90..720f078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<dependency>
<groupId>com.ibm.narpc</groupId>
<artifactId>narpc</artifactId>
- <version>1.0</version>
+ <version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
----------------------------------------------------------------------
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
index 7c2f78d..60b1833 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java
@@ -49,7 +49,7 @@ public class TcpNameNodeServer extends RpcServer {
TcpRpcConstants.verify();
this.serverGroup = new NaRPCServerGroup<TcpNameNodeRequest, TcpNameNodeResponse>(
dispatcher, TcpRpcConstants.NAMENODE_TCP_QUEUEDEPTH,
- TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true);
+ TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true, TcpRpcConstants.NAMENODE_TCP_CORES);
this.serverEndpoint = serverGroup.createServerEndpoint();
InetSocketAddress inetSocketAddress = CrailUtils.getNameNodeAddress();
serverEndpoint.bind(inetSocketAddress);
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
----------------------------------------------------------------------
diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
index 407d366..05a0b10 100644
--- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
+++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java
@@ -34,6 +34,9 @@ public class TcpRpcConstants {
public static final String NAMENODE_TCP_MESSAGESIZE_KEY = "crail.namenode.tcp.messageSize";
public static int NAMENODE_TCP_MESSAGESIZE = 512;
+ public static final String NAMENODE_TCP_CORES_KEY = "crail.namenode.tcp.cores";
+ public static int NAMENODE_TCP_CORES = 1;
+
public static void updateConstants(CrailConfiguration conf){
if (conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY) != null) {
NAMENODE_TCP_QUEUEDEPTH = Integer.parseInt(conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY));
@@ -41,6 +44,9 @@ public class TcpRpcConstants {
if (conf.get(NAMENODE_TCP_MESSAGESIZE_KEY) != null) {
NAMENODE_TCP_MESSAGESIZE = Integer.parseInt(conf.get(NAMENODE_TCP_MESSAGESIZE_KEY));
}
+ if (conf.get(NAMENODE_TCP_CORES_KEY) != null) {
+ NAMENODE_TCP_CORES = Integer.parseInt(conf.get(NAMENODE_TCP_CORES_KEY));
+ }
}
public static void verify() throws IOException {
@@ -49,5 +55,6 @@ public class TcpRpcConstants {
public static void printConf(Logger logger) {
LOG.info(NAMENODE_TCP_QUEUEDEPTH_KEY + " " + NAMENODE_TCP_QUEUEDEPTH);
LOG.info(NAMENODE_TCP_MESSAGESIZE_KEY + " " + NAMENODE_TCP_MESSAGESIZE);
+ LOG.info(NAMENODE_TCP_CORES_KEY + " " + NAMENODE_TCP_CORES);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
----------------------------------------------------------------------
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
index 55c882e..885f8c1 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java
@@ -53,11 +53,16 @@ public class TcpStorageConstants {
public static final String STORAGE_TCP_QUEUE_DEPTH_KEY = "crail.storage.tcp.queuedepth";
public static int STORAGE_TCP_QUEUE_DEPTH = 16;
+ public static final String STORAGE_TCP_CORES_KEY = "crail.storage.tcp.cores";
+ public static int STORAGE_TCP_CORES = 1;
+
public static void init(CrailConfiguration conf, String[] args) throws Exception {
if (args != null) {
Option portOption = Option.builder("p").desc("port to start server on").hasArg().build();
+ Option coresOption = Option.builder("c").desc("number of cores to use").hasArg().build();
Options options = new Options();
options.addOption(portOption);
+ options.addOption(coresOption);
CommandLineParser parser = new DefaultParser();
try {
@@ -67,6 +72,12 @@ public class TcpStorageConstants {
LOG.info("using custom port " + port);
conf.set(TcpStorageConstants.STORAGE_TCP_PORT_KEY, port);
}
+ if (line.hasOption(coresOption.getOpt())) {
+ String cores = line.getOptionValue(coresOption.getOpt());
+ LOG.info("number of cores used is " + cores);
+ conf.set(TcpStorageConstants.STORAGE_TCP_CORES_KEY, cores);
+ }
+
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("RDMA storage tier", options);
@@ -95,6 +106,9 @@ public class TcpStorageConstants {
}
if (conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY) != null) {
STORAGE_TCP_QUEUE_DEPTH = Integer.parseInt(conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY));
+ }
+ if (conf.get(STORAGE_TCP_CORES_KEY) != null) {
+ STORAGE_TCP_CORES = Integer.parseInt(conf.get(STORAGE_TCP_CORES_KEY));
}
}
@@ -105,6 +119,7 @@ public class TcpStorageConstants {
logger.info(STORAGE_TCP_ALLOCATION_SIZE_KEY + " " + STORAGE_TCP_ALLOCATION_SIZE);
logger.info(STORAGE_TCP_DATA_PATH_KEY + " " + STORAGE_TCP_DATA_PATH);
logger.info(STORAGE_TCP_QUEUE_DEPTH_KEY + " " + STORAGE_TCP_QUEUE_DEPTH);
+ logger.info(STORAGE_TCP_CORES_KEY + " " + STORAGE_TCP_CORES);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
----------------------------------------------------------------------
diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
index 9ed2260..f415f07 100644
--- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
+++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java
@@ -61,7 +61,7 @@ public class TcpStorageServer implements Runnable, StorageServer, NaRPCService<T
public void init(CrailConfiguration conf, String[] args) throws Exception {
TcpStorageConstants.init(conf, args);
- this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false);
+ this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false, TcpStorageConstants.STORAGE_TCP_CORES);
this.serverEndpoint = serverGroup.createServerEndpoint();
this.address = getDataNodeAddress();
serverEndpoint.bind(address);